launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #27235
[Merge] ~cjwatson/launchpad:librarian-multi-swift into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:librarian-multi-swift into launchpad:master.
Commit message:
librarianserver: Support multiple Swift instances
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/404970
When migrating from one Swift instance to another (between clouds), moving all the production data takes a long time. Rather than having to wait for this, it's better if we can start using the new Swift instance in parallel with the data migration. To support this, allow the librarian to be configured with two sets of Swift credentials: in that case, it will read from both (preferring the newer instance), feed new files only into the newer instance, and garbage-collect from both instances.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:librarian-multi-swift into launchpad:master.
diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf
index 952f30d..749ecaa 100644
--- a/lib/lp/services/config/schema-lazr.conf
+++ b/lib/lp/services/config/schema-lazr.conf
@@ -1264,6 +1264,24 @@ os_tenant_name: none
# datatype: string
os_auth_version: 2.0
+# Connection information and secret for a previous Swift instance that we're
+# migrating away from, but should still read from if necessary.
+#
+# datatype: urlbase
+old_os_auth_url: none
+
+# datatype: string
+old_os_username: none
+
+# datatype: string
+old_os_password: none
+
+# datatype: string
+old_os_tenant_name: none
+
+# datatype: string
+old_os_auth_version: 2.0
+
# Mailman configuration. Most of this is configured in
# https://git.launchpad.net/lp-mailman instead; the entries here are only
diff --git a/lib/lp/services/librarianserver/librariangc.py b/lib/lp/services/librarianserver/librariangc.py
index af866ff..44f3042 100644
--- a/lib/lp/services/librarianserver/librariangc.py
+++ b/lib/lp/services/librarianserver/librariangc.py
@@ -7,6 +7,10 @@ from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
from datetime import (
datetime,
timedelta,
@@ -59,16 +63,17 @@ def file_exists(content_id):
"""
swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
if swift_enabled:
- swift_connection = swift.connection_pool.get()
- container, name = swift.swift_location(content_id)
- try:
- swift.quiet_swiftclient(
- swift_connection.head_object, container, name)
- return True
- except swiftclient.ClientException as x:
- if x.http_status != 404:
- raise
- swift.connection_pool.put(swift_connection)
+ for connection_pool in swift.connection_pools:
+ swift_connection = connection_pool.get()
+ container, name = swift.swift_location(content_id)
+ try:
+ swift.quiet_swiftclient(
+ swift_connection.head_object, container, name)
+ return True
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
+ connection_pool.put(swift_connection)
return os.path.exists(get_file_path(content_id))
@@ -84,16 +89,20 @@ def open_stream(content_id):
"""
swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
if swift_enabled:
- try:
- swift_connection = swift.connection_pool.get()
- container, name = swift.swift_location(content_id)
- chunks = swift.quiet_swiftclient(
- swift_connection.get_object,
- container, name, resp_chunk_size=STREAM_CHUNK_SIZE)[1]
- return swift.SwiftStream(swift_connection, chunks)
- except swiftclient.ClientException as x:
- if x.http_status != 404:
- raise
+ for connection_pool in swift.connection_pools:
+ try:
+ swift_connection = connection_pool.get()
+ container, name = swift.swift_location(content_id)
+ chunks = swift.quiet_swiftclient(
+ swift_connection.get_object,
+ container, name, resp_chunk_size=STREAM_CHUNK_SIZE)[1]
+ return swift.SwiftStream(
+ connection_pool, swift_connection, chunks)
+ except swiftclient.ClientException as x:
+ if x.http_status == 404:
+ connection_pool.put(swift_connection)
+ else:
+ raise
path = get_file_path(content_id)
if os.path.exists(path):
return open(path, 'rb')
@@ -540,14 +549,15 @@ class UnreferencedContentPruner:
# Remove the file from Swift, if it hasn't already been.
if self.swift_enabled:
container, name = swift.swift_location(content_id)
- with swift.connection() as swift_connection:
- try:
- swift.quiet_swiftclient(
- swift_connection.delete_object, container, name)
- removed.append('Swift')
- except swiftclient.ClientException as x:
- if x.http_status != 404:
- raise
+ for connection_pool in swift.connection_pools:
+ with swift.connection(connection_pool) as swift_connection:
+ try:
+ swift.quiet_swiftclient(
+ swift_connection.delete_object, container, name)
+ removed.append('Swift')
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
if removed:
log.debug3(
@@ -737,13 +747,19 @@ def delete_unwanted_disk_files(con):
def swift_files(max_lfc_id):
- """Generate the (container, name) of all files stored in Swift.
+ """Generate all files stored in all configured Swift instances.
- Results are yielded in numerical order.
+ For each file, yield (connection_pool, container, name). Results are
+ yielded in numerical order; if the same file is present in multiple
+ Swift instances, all copies of it are yielded before moving on to the
+ next file.
"""
final_container = swift.swift_location(max_lfc_id)[0]
- with swift.connection() as swift_connection:
+ with ExitStack() as stack:
+ swift_connections = [
+ stack.enter_context(swift.connection(connection_pool))
+ for connection_pool in swift.connection_pools]
# We generate the container names, rather than query the
# server, because the mock Swift implementation doesn't
# support that operation.
@@ -753,21 +769,24 @@ def swift_files(max_lfc_id):
container_num += 1
container = swift.SWIFT_CONTAINER_PREFIX + str(container_num)
seen_names = set()
- try:
- objs = sorted(
- swift.quiet_swiftclient(
- swift_connection.get_container,
- container, full_listing=True)[1],
- key=lambda x: [
- int(segment) for segment in x['name'].split('/')])
- for obj in objs:
- if obj['name'] not in seen_names:
- yield (container, obj)
- seen_names.add(obj['name'])
- except swiftclient.ClientException as x:
- if x.http_status == 404:
- continue
- raise
+ objs = []
+ for pool_index, swift_connection in enumerate(swift_connections):
+ try:
+ objs.extend([
+ (obj, pool_index)
+ for obj in swift.quiet_swiftclient(
+ swift_connection.get_container,
+ container, full_listing=True)[1]])
+ except swiftclient.ClientException as x:
+ if x.http_status == 404:
+ continue
+ raise
+ objs.sort(key=lambda x: (
+ [int(segment) for segment in x[0]['name'].split('/')], x[1]))
+ for obj, pool_index in objs:
+ if (obj['name'], pool_index) not in seen_names:
+ yield (swift.connection_pools[pool_index], container, obj)
+ seen_names.add((obj['name'], pool_index))
def delete_unwanted_swift_files(con):
@@ -799,7 +818,7 @@ def delete_unwanted_swift_files(con):
removed_count = 0
content_id = next_wanted_content_id = -1
- for container, obj in swift_files(max_lfc_id):
+ for connection_pool, container, obj in swift_files(max_lfc_id):
name = obj['name']
# We may have a segment of a large file.
@@ -833,14 +852,15 @@ def delete_unwanted_swift_files(con):
log.debug3(
"File %d not removed - created too recently", content_id)
else:
- with swift.connection() as swift_connection:
+ with swift.connection(connection_pool) as swift_connection:
try:
swift_connection.delete_object(container, name)
except swiftclient.ClientException as e:
if e.http_status != 404:
raise
log.debug3(
- 'Deleted ({0}, {1}) from Swift'.format(container, name))
+ 'Deleted (%s, %s) from Swift (%s)',
+ container, name, connection_pool.os_auth_url)
removed_count += 1
if next_wanted_content_id == content_id:
diff --git a/lib/lp/services/librarianserver/storage.py b/lib/lp/services/librarianserver/storage.py
index 89af9d6..51601bd 100644
--- a/lib/lp/services/librarianserver/storage.py
+++ b/lib/lp/services/librarianserver/storage.py
@@ -120,24 +120,33 @@ class LibrarianStorage:
log.msg('{} Swift download attempts, {} failures'.format(
self.swift_download_attempts, self.swift_download_fails))
- # First, try and stream the file from Swift.
+ # First, try and stream the file from Swift. Try the newest
+ # configured instance first.
container, name = swift.swift_location(fileid)
- swift_connection = swift.connection_pool.get()
- try:
- headers, chunks = yield deferToThread(
- swift.quiet_swiftclient, swift_connection.get_object,
- container, name, resp_chunk_size=self.CHUNK_SIZE)
- swift_stream = TxSwiftStream(swift_connection, chunks)
- defer.returnValue(swift_stream)
- except swiftclient.ClientException as x:
- if x.http_status == 404:
- swift.connection_pool.put(swift_connection)
- else:
- self.swift_download_fails += 1
- log.err(x)
- except Exception as x:
- self.swift_download_fails += 1
- log.err(x)
+ for connection_pool in reversed(swift.connection_pools):
+ try:
+ with swift.connection(connection_pool) as swift_connection:
+ try:
+ headers, chunks = yield deferToThread(
+ swift.quiet_swiftclient,
+ swift_connection.get_object,
+ container, name,
+ resp_chunk_size=self.CHUNK_SIZE)
+ swift_stream = TxSwiftStream(
+ connection_pool, swift_connection, chunks)
+ defer.returnValue(swift_stream)
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ log.err(x)
+ raise
+ except Exception as x:
+ log.err(x)
+ raise
+ except Exception:
+ # Fall through to try either the next Swift instance or
+ # the local disk.
+ pass
+ self.swift_download_fails += 1
# If Swift failed, for any reason, fall through to try and
# stream the data from disk. In particular, files cannot be
# found in Swift until librarian-feed-swift.py has put them
@@ -176,7 +185,7 @@ class TxSwiftStream(swift.SwiftStream):
# the connection can be reused saving on auth
# handshakes.
if self._swift_connection is not None:
- swift.connection_pool.put(self._swift_connection)
+ self._connection_pool.put(self._swift_connection)
self._swift_connection = None
self._chunks = None
defer.returnValue(b'')
diff --git a/lib/lp/services/librarianserver/swift.py b/lib/lp/services/librarianserver/swift.py
index c05a702..662b2fa 100644
--- a/lib/lp/services/librarianserver/swift.py
+++ b/lib/lp/services/librarianserver/swift.py
@@ -9,9 +9,10 @@ __metaclass__ = type
__all__ = [
'SWIFT_CONTAINER_PREFIX',
'connection',
- 'connection_pool',
+ 'connection_pools',
'filesystem_path',
'quiet_swiftclient',
+ 'reconfigure_connection_pools',
'swift_location',
'to_swift',
]
@@ -70,7 +71,7 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None,
If remove_func is set, it is called for every file after being copied into
Swift.
'''
- swift_connection = connection_pool.get()
+ swift_connection = connection_pools[-1].get()
fs_root = os.path.abspath(config.librarian_server.root)
if start_lfc_id is None:
@@ -154,39 +155,49 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None,
lfc))
continue
- container, obj_name = swift_location(lfc)
-
- try:
- quiet_swiftclient(swift_connection.head_container, container)
- log.debug2('{0} container already exists'.format(container))
- except swiftclient.ClientException as x:
- if x.http_status != 404:
- raise
- log.info('Creating {0} container'.format(container))
- swift_connection.put_container(container)
-
- try:
- headers = quiet_swiftclient(
- swift_connection.head_object, container, obj_name)
- log.debug(
- "{0} already exists in Swift({1}, {2})".format(
- lfc, container, obj_name))
- if ('X-Object-Manifest' not in headers and
- int(headers['content-length'])
- != os.path.getsize(fs_path)):
- raise AssertionError(
- '{0} has incorrect size in Swift'.format(lfc))
- except swiftclient.ClientException as x:
- if x.http_status != 404:
- raise
- log.info('Putting {0} into Swift ({1}, {2})'.format(
- lfc, container, obj_name))
- _put(log, swift_connection, lfc, container, obj_name, fs_path)
+ _to_swift_file(log, swift_connection, lfc, fs_path)
if remove_func:
remove_func(fs_path)
- connection_pool.put(swift_connection)
+ connection_pools[-1].put(swift_connection)
+
+
+def _to_swift_file(log, swift_connection, lfc_id, fs_path):
+ '''Copy a single file into Swift.
+
+ This is separate for the benefit of tests; production code should use
+ `to_swift` rather than calling this function directly, since this omits
+ a number of checks.
+ '''
+ container, obj_name = swift_location(lfc_id)
+
+ try:
+ quiet_swiftclient(swift_connection.head_container, container)
+ log.debug2('{0} container already exists'.format(container))
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
+ log.info('Creating {0} container'.format(container))
+ swift_connection.put_container(container)
+
+ try:
+ headers = quiet_swiftclient(
+ swift_connection.head_object, container, obj_name)
+ log.debug(
+ "{0} already exists in Swift({1}, {2})".format(
+ lfc_id, container, obj_name))
+ if ('X-Object-Manifest' not in headers and
+ int(headers['content-length'])
+ != os.path.getsize(fs_path)):
+ raise AssertionError(
+ '{0} has incorrect size in Swift'.format(lfc_id))
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
+ log.info('Putting {0} into Swift ({1}, {2})'.format(
+ lfc_id, container, obj_name))
+ _put(log, swift_connection, lfc_id, container, obj_name, fs_path)
def rename(path):
@@ -283,7 +294,8 @@ def filesystem_path(lfc_id):
class SwiftStream:
- def __init__(self, swift_connection, chunks):
+ def __init__(self, connection_pool, swift_connection, chunks):
+ self._connection_pool = connection_pool
self._swift_connection = swift_connection
self._chunks = chunks # Generator from swiftclient.get_object()
@@ -311,7 +323,7 @@ class SwiftStream:
# If we have drained the data successfully,
# the connection can be reused saving on auth
# handshakes.
- connection_pool.put(self._swift_connection)
+ self._connection_pool.put(self._swift_connection)
self._swift_connection = None
self._chunks = None
break
@@ -378,14 +390,20 @@ class HashStream:
class ConnectionPool:
MAX_POOL_SIZE = 10
- def __init__(self):
+ def __init__(self, os_auth_url, os_username, os_password, os_tenant_name,
+ os_auth_version):
+ self.os_auth_url = os_auth_url
+ self.os_username = os_username
+ self.os_password = os_password
+ self.os_tenant_name = os_tenant_name
+ self.os_auth_version = os_auth_version
self.clear()
def clear(self):
self._pool = []
def get(self):
- '''Return a conection from the pool, or a fresh connection.'''
+ '''Return a connection from the pool, or a fresh connection.'''
try:
return self._pool.pop()
except IndexError:
@@ -408,20 +426,51 @@ class ConnectionPool:
def _new_connection(self):
return swiftclient.Connection(
- authurl=config.librarian_server.os_auth_url,
- user=config.librarian_server.os_username,
- key=config.librarian_server.os_password,
- tenant_name=config.librarian_server.os_tenant_name,
- auth_version=config.librarian_server.os_auth_version,
+ authurl=self.os_auth_url,
+ user=self.os_username,
+ key=self.os_password,
+ tenant_name=self.os_tenant_name,
+ auth_version=self.os_auth_version,
)
-connection_pool = ConnectionPool()
+connection_pools = []
+
+
+def reconfigure_connection_pools():
+ del connection_pools[:]
+ # The zero-one-infinity principle suggests that we should generalize
+ # this to more than two pools. However, lazr.config makes this a bit
+ # awkward (there's no native support for lists of key-value pairs with
+ # schema enforcement nor for multi-line values, so we'd have to encode
+ # lists as JSON and check the schema manually), and at the moment the
+ # only use case for this is for migrating from an old Swift instance to
+ # a new one.
+ if config.librarian_server.old_os_auth_url:
+ connection_pools.append(
+ ConnectionPool(
+ config.librarian_server.old_os_auth_url,
+ config.librarian_server.old_os_username,
+ config.librarian_server.old_os_password,
+ config.librarian_server.old_os_tenant_name,
+ config.librarian_server.old_os_auth_version))
+ if config.librarian_server.os_auth_url:
+ connection_pools.append(
+ ConnectionPool(
+ config.librarian_server.os_auth_url,
+ config.librarian_server.os_username,
+ config.librarian_server.os_password,
+ config.librarian_server.os_tenant_name,
+ config.librarian_server.os_auth_version))
+
+
+reconfigure_connection_pools()
@contextmanager
-def connection():
- global connection_pool
+def connection(connection_pool=None):
+ if connection_pool is None:
+ connection_pool = connection_pools[-1]
con = connection_pool.get()
yield con
diff --git a/lib/lp/services/librarianserver/tests/test_gc.py b/lib/lp/services/librarianserver/tests/test_gc.py
index 0d8ac5f..ff4d696 100644
--- a/lib/lp/services/librarianserver/tests/test_gc.py
+++ b/lib/lp/services/librarianserver/tests/test_gc.py
@@ -32,6 +32,10 @@ from six.moves.urllib.parse import urljoin
from sqlobject import SQLObjectNotFound
from storm.store import Store
from swiftclient import client as swiftclient
+from testtools.matchers import (
+ Equals,
+ MatchesListwise,
+ )
import transaction
from lp.services.config import config
@@ -720,7 +724,6 @@ class TestSwiftLibrarianGarbageCollection(
# the lp.testing.layers code and save the per-test overhead.
self.swift_fixture = self.useFixture(SwiftFixture())
- self.addCleanup(swift.connection_pool.clear)
self.useFixture(FeatureFixture({'librarian.swift.enabled': True}))
@@ -732,24 +735,34 @@ class TestSwiftLibrarianGarbageCollection(
swift.to_swift(BufferLogger(), remove_func=os.unlink)
assert not os.path.exists(path), "to_swift failed to move files"
- def file_exists(self, content_id, suffix=None):
+ def file_exists(self, content_id, suffix=None, pool_index=None):
container, name = swift.swift_location(content_id)
if suffix:
name += suffix
- with swift.connection() as swift_connection:
- try:
- swift.quiet_swiftclient(
- swift_connection.head_object, container, name)
- return True
- except swiftclient.ClientException as x:
- if x.http_status == 404:
- return False
- raise
+ connection_pools = (
+ [swift.connection_pools[pool_index]] if pool_index is not None
+ else swift.connection_pools)
+ for connection_pool in connection_pools:
+ with swift.connection(connection_pool) as swift_connection:
+ try:
+ swift.quiet_swiftclient(
+ swift_connection.head_object, container, name)
+ return True
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
+ return False
def remove_file(self, content_id):
container, name = swift.swift_location(content_id)
- with swift.connection() as swift_connection:
- swift_connection.delete_object(container, name)
+ for connection_pool in swift.connection_pools:
+ with swift.connection(connection_pool) as swift_connection:
+ try:
+ swift.quiet_swiftclient(
+ swift_connection.delete_object, container, name)
+ except swiftclient.ClientException as x:
+ if x.http_status != 404:
+ raise
def test_delete_unwanted_files_handles_segments(self):
# Large files are handled by Swift as multiple segments joined
@@ -920,6 +933,142 @@ class TestSwiftLibrarianGarbageCollection(
self.assertTrue(self.file_exists(f2_id))
+class TestTwoSwiftsLibrarianGarbageCollection(
+ TestSwiftLibrarianGarbageCollection):
+ """Garbage-collection tests with two Swift instances."""
+
+ def setUp(self):
+ self.old_swift_fixture = self.useFixture(
+ SwiftFixture(old_instance=True))
+ super(TestTwoSwiftsLibrarianGarbageCollection, self).setUp()
+
+ def test_swift_files_multiple_instances(self):
+ # swift_files yields files in the correct order across multiple
+ # Swift instances.
+ switch_dbuser('testadmin')
+ content = b'foo'
+ lfas = [
+ LibraryFileAlias.get(self.client.addFile(
+ 'foo.txt', len(content), io.BytesIO(content), 'text/plain'))
+ for _ in range(12)]
+ lfc_ids = [lfa.contentID for lfa in lfas]
+ transaction.commit()
+
+ # Simulate a migration in progress. Some files are only in the old
+ # Swift instance and have not been copied over yet; some are in both
+ # instances; some were created after the migration started and so
+ # are only in the new instance.
+ old_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids)
+ if i in {0, 1, 3, 4, 6, 7, 8, 9}]
+ for lfc_id in old_lfc_ids:
+ with swift.connection(
+ swift.connection_pools[0]) as swift_connection:
+ swift._to_swift_file(
+ BufferLogger(), swift_connection, lfc_id,
+ swift.filesystem_path(lfc_id))
+ new_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids)
+ if i in {1, 2, 3, 8, 9, 10}]
+ for lfc_id in new_lfc_ids:
+ with swift.connection() as swift_connection:
+ swift._to_swift_file(
+ BufferLogger(), swift_connection, lfc_id,
+ swift.filesystem_path(lfc_id))
+
+ self.assertThat(
+ [(pool, container, int(obj['name']))
+ for pool, container, obj in librariangc.swift_files(lfc_ids[-1])
+ if int(obj['name']) >= lfc_ids[0]],
+ MatchesListwise([
+ Equals((
+ swift.connection_pools[pool_index], container, lfc_ids[i]))
+ for pool_index, container, i in (
+ (0, 'librarian_0', 0),
+ (0, 'librarian_0', 1),
+ (1, 'librarian_0', 1),
+ (1, 'librarian_0', 2),
+ (0, 'librarian_0', 3),
+ (1, 'librarian_0', 3),
+ (0, 'librarian_0', 4),
+ (0, 'librarian_0', 6),
+ (0, 'librarian_0', 7),
+ (0, 'librarian_0', 8),
+ (1, 'librarian_0', 8),
+ (0, 'librarian_0', 9),
+ (1, 'librarian_0', 9),
+ (1, 'librarian_0', 10),
+ )]))
+
+ def test_delete_unwanted_files_handles_multiple_instances(self):
+ # GC handles cases where files only exist in one or the other Swift
+ # instance.
+ switch_dbuser('testadmin')
+ content = b'foo'
+ lfas = [
+ LibraryFileAlias.get(self.client.addFile(
+ 'foo.txt', len(content), io.BytesIO(content), 'text/plain'))
+ for _ in range(12)]
+ lfc_ids = [lfa.contentID for lfa in lfas]
+ transaction.commit()
+
+ for lfc_id in lfc_ids:
+ # Make the files old so they don't look in-progress.
+ os.utime(swift.filesystem_path(lfc_id), (0, 0))
+
+ # Simulate a migration in progress. Some files are only in the old
+ # Swift instance and have not been copied over yet; some are in both
+ # instances; some were created after the migration started and so
+ # are only in the new instance.
+ old_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids)
+ if i in {0, 1, 3, 4, 6, 7, 8}]
+ for lfc_id in old_lfc_ids:
+ with swift.connection(
+ swift.connection_pools[0]) as swift_connection:
+ swift._to_swift_file(
+ BufferLogger(), swift_connection, lfc_id,
+ swift.filesystem_path(lfc_id))
+ new_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids)
+ if i in {1, 2, 3, 8, 9, 10}]
+ for lfc_id in new_lfc_ids:
+ with swift.connection() as swift_connection:
+ swift._to_swift_file(
+ BufferLogger(), swift_connection, lfc_id,
+ swift.filesystem_path(lfc_id))
+
+ # All the files survive the first run.
+ with self.librariangc_thinking_it_is_tomorrow():
+ librariangc.delete_unwanted_files(self.con)
+ for lfc_id in lfc_ids:
+ self.assertEqual(
+ lfc_id in old_lfc_ids, self.file_exists(lfc_id, pool_index=0))
+ self.assertEqual(
+ lfc_id in new_lfc_ids, self.file_exists(lfc_id, pool_index=1))
+
+ # Remove one file that is only in the old instance, one that is in
+ # both instances, and one that is only in the new instance.
+ for i in (0, 3, 9):
+ content = lfas[i].content
+ Store.of(lfas[i]).remove(lfas[i])
+ Store.of(content).remove(content)
+ transaction.commit()
+
+ # The now-unreferenced files are removed, but the others are intact.
+ with self.librariangc_thinking_it_is_tomorrow():
+ librariangc.delete_unwanted_files(self.con)
+ old_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids) if i in {1, 4, 6, 7, 8}]
+ new_lfc_ids = [
+ lfc_id for i, lfc_id in enumerate(lfc_ids) if i in {1, 2, 8, 10}]
+ for lfc_id in lfc_ids:
+ self.assertEqual(
+ lfc_id in old_lfc_ids, self.file_exists(lfc_id, pool_index=0))
+ self.assertEqual(
+ lfc_id in new_lfc_ids, self.file_exists(lfc_id, pool_index=1))
+
+
class TestBlobCollection(TestCase):
layer = LaunchpadZopelessLayer
diff --git a/lib/lp/services/librarianserver/tests/test_storage_db.py b/lib/lp/services/librarianserver/tests/test_storage_db.py
index b24d87b..63929b5 100644
--- a/lib/lp/services/librarianserver/tests/test_storage_db.py
+++ b/lib/lp/services/librarianserver/tests/test_storage_db.py
@@ -8,6 +8,7 @@ import os.path
import time
from fixtures import TempDir
+from swiftclient import client as swiftclient
from testtools.testcase import ExpectedException
from testtools.twistedsupport import AsynchronousDeferredRunTest
import transaction
@@ -157,19 +158,24 @@ class LibrarianStorageSwiftTests(TestCase):
self.pushConfig('librarian_server', root=self.directory)
self.storage = LibrarianStorage(self.directory, db.Library())
transaction.commit()
- self.addCleanup(swift.connection_pool.clear)
- def moveToSwift(self, lfc_id):
- # Move a file to Swift so that we know it can't accidentally be
- # retrieved from the local file system. We set its modification
- # time far enough in the past that it isn't considered potentially
- # in progress.
+ def copyToSwift(self, lfc_id, swift_fixture=None):
+ # Copy a file to Swift.
+ if swift_fixture is None:
+ swift_fixture = self.swift_fixture
path = swift.filesystem_path(lfc_id)
- mtime = time.time() - 25 * 60 * 60
- os.utime(path, (mtime, mtime))
- self.assertTrue(os.path.exists(path))
- swift.to_swift(DevNullLogger(), remove_func=os.unlink)
- self.assertFalse(os.path.exists(path))
+ swift_connection = swift_fixture.connect()
+ try:
+ swift._to_swift_file(
+ DevNullLogger(), swift_connection, lfc_id, path)
+ finally:
+ swift_connection.close()
+
+ def moveToSwift(self, lfc_id, swift_fixture=None):
+ # Move a file to Swift so that we know it can't accidentally be
+ # retrieved from the local file system.
+ self.copyToSwift(lfc_id, swift_fixture=swift_fixture)
+ os.unlink(swift.filesystem_path(lfc_id))
@defer.inlineCallbacks
def test_completed_fetch_reuses_connection(self):
@@ -190,7 +196,7 @@ class LibrarianStorageSwiftTests(TestCase):
break
chunks.append(chunk)
self.assertEqual(b''.join(chunks), data)
- self.assertEqual(1, len(swift.connection_pool._pool))
+ self.assertEqual(1, len(swift.connection_pools[-1]._pool))
@defer.inlineCallbacks
def test_partial_fetch_does_not_reuse_connection(self):
@@ -209,7 +215,7 @@ class LibrarianStorageSwiftTests(TestCase):
stream.close()
with ExpectedException(ValueError, 'I/O operation on closed file'):
yield stream.read(self.storage.CHUNK_SIZE)
- self.assertEqual(0, len(swift.connection_pool._pool))
+ self.assertEqual(0, len(swift.connection_pools[-1]._pool))
@defer.inlineCallbacks
def test_fetch_with_close_at_end_does_not_reuse_connection(self):
@@ -235,4 +241,71 @@ class LibrarianStorageSwiftTests(TestCase):
self.assertEqual(b'', chunk)
# In principle we might be able to reuse the connection here, but
# SwiftStream.close doesn't know that.
- self.assertEqual(0, len(swift.connection_pool._pool))
+ self.assertEqual(0, len(swift.connection_pools[-1]._pool))
+
+ @defer.inlineCallbacks
+ def test_multiple_swift_instances(self):
+ # If multiple Swift instances are configured, LibrarianStorage tries
+ # each in turn until it finds the object.
+ old_swift_fixture = self.useFixture(SwiftFixture(old_instance=True))
+ # We need to push this again, since setting up SwiftFixture reloads
+ # the config.
+ self.pushConfig('librarian_server', root=self.directory)
+
+ old_data = b'x' * (self.storage.CHUNK_SIZE * 4 + 1)
+ old_file = self.storage.startAddFile('file1', len(old_data))
+ old_file.mimetype = 'text/plain'
+ old_file.append(old_data)
+ old_lfc_id, _ = old_file.store()
+ self.moveToSwift(old_lfc_id, swift_fixture=old_swift_fixture)
+
+ both_data = b'y' * (self.storage.CHUNK_SIZE * 4 + 1)
+ both_file = self.storage.startAddFile('file2', len(both_data))
+ both_file.mimetype = 'text/plain'
+ both_file.append(both_data)
+ both_lfc_id, _ = both_file.store()
+ self.copyToSwift(both_lfc_id, swift_fixture=old_swift_fixture)
+ self.moveToSwift(both_lfc_id)
+
+ new_data = b'z' * (self.storage.CHUNK_SIZE * 4 + 1)
+ new_file = self.storage.startAddFile('file3', len(new_data))
+ new_file.mimetype = 'text/plain'
+ new_file.append(new_data)
+ new_lfc_id, _ = new_file.store()
+ self.moveToSwift(new_lfc_id)
+
+ old_stream = yield self.storage.open(old_lfc_id)
+ self.assertIsNotNone(old_stream)
+ self.assertEqual(
+ swift.connection_pools[0], old_stream._connection_pool)
+ chunks = []
+ while True:
+ chunk = yield old_stream.read(self.storage.CHUNK_SIZE)
+ if not chunk:
+ break
+ chunks.append(chunk)
+ self.assertEqual(b''.join(chunks), old_data)
+
+ both_stream = yield self.storage.open(both_lfc_id)
+ self.assertIsNotNone(both_stream)
+ self.assertEqual(
+ swift.connection_pools[1], both_stream._connection_pool)
+ chunks = []
+ while True:
+ chunk = yield both_stream.read(self.storage.CHUNK_SIZE)
+ if not chunk:
+ break
+ chunks.append(chunk)
+ self.assertEqual(b''.join(chunks), both_data)
+
+ new_stream = yield self.storage.open(new_lfc_id)
+ self.assertIsNotNone(new_stream)
+ self.assertEqual(
+ swift.connection_pools[1], new_stream._connection_pool)
+ chunks = []
+ while True:
+ chunk = yield new_stream.read(self.storage.CHUNK_SIZE)
+ if not chunk:
+ break
+ chunks.append(chunk)
+ self.assertEqual(b''.join(chunks), new_data)
diff --git a/lib/lp/services/librarianserver/tests/test_swift.py b/lib/lp/services/librarianserver/tests/test_swift.py
index b891b0d..61a4a3d 100644
--- a/lib/lp/services/librarianserver/tests/test_swift.py
+++ b/lib/lp/services/librarianserver/tests/test_swift.py
@@ -43,8 +43,6 @@ class TestFeedSwift(TestCase):
self.useFixture(FeatureFixture({'librarian.swift.enabled': True}))
transaction.commit()
- self.addCleanup(swift.connection_pool.clear)
-
# Restart the Librarian so it picks up the OS_* environment
# variables.
LibrarianLayer.librarian_fixture.cleanUp()
@@ -167,6 +165,45 @@ class TestFeedSwift(TestCase):
headers, obj = swift_client.get_object(container, name)
self.assertEqual(contents, obj, 'Did not round trip')
+ def test_move_to_swift_multiple_instances(self):
+ # If multiple Swift instances are configured, we only migrate to the
+ # newest one.
+ old_swift_fixture = self.useFixture(SwiftFixture(old_instance=True))
+
+ log = BufferLogger()
+
+ # Confirm that files exist on disk where we expect to find them.
+ for lfc in self.lfcs:
+ path = swift.filesystem_path(lfc.id)
+ self.assertTrue(os.path.exists(path))
+
+ # Migrate all the files into Swift.
+ swift.to_swift(log, remove_func=os.unlink)
+
+ # Confirm that all the files have gone from disk.
+ for lfc in self.lfcs:
+ self.assertFalse(os.path.exists(swift.filesystem_path(lfc.id)))
+
+ # Confirm all the files are in the correct Swift instance.
+ swift_client = self.swift_fixture.connect()
+ try:
+ for lfc, contents in zip(self.lfcs, self.contents):
+ container, name = swift.swift_location(lfc.id)
+ headers, obj = swift_client.get_object(container, name)
+ self.assertEqual(contents, obj, 'Did not round trip')
+ finally:
+ swift_client.close()
+
+ old_swift_client = old_swift_fixture.connect()
+ try:
+ for lfc, contents in zip(self.lfcs, self.contents):
+ container, name = swift.swift_location(lfc.id)
+ self.assertRaises(
+ swiftclient.ClientException, swift.quiet_swiftclient,
+ old_swift_client.get_object, container, name)
+ finally:
+ old_swift_client.close()
+
def test_librarian_serves_from_swift(self):
log = BufferLogger()
@@ -289,7 +326,7 @@ class TestFeedSwift(TestCase):
# Our object round tripped
self.assertEqual(obj1 + obj2 + obj3, expected_content)
- def test_multiple_instances(self):
+ def test_multiple_feed_instances(self):
log = BufferLogger()
# Confirm that files exist on disk where we expect to find them.
diff --git a/lib/lp/testing/swift/fixture.py b/lib/lp/testing/swift/fixture.py
index 2158b29..6e479b5 100644
--- a/lib/lp/testing/swift/fixture.py
+++ b/lib/lp/testing/swift/fixture.py
@@ -19,6 +19,7 @@ import testtools.content_type
from txfixtures.tachandler import TacTestFixture
from lp.services.config import config
+from lp.services.librarianserver import swift
from lp.testing.layers import BaseLayer
from lp.testing.swift import fakeswift
@@ -31,6 +32,15 @@ class SwiftFixture(TacTestFixture):
root = None
daemon_port = None
+ def __init__(self, old_instance=False):
+ super(SwiftFixture, self).__init__()
+ self.old_instance = old_instance
+
+ def _getConfig(self, key):
+ return getattr(
+ config.librarian_server,
+ 'old_' + key if self.old_instance else key)
+
def setUp(self, spew=False, umask=None):
# Pick a random, free port.
if self.daemon_port is None:
@@ -56,21 +66,26 @@ class SwiftFixture(TacTestFixture):
self, logfile, 'swift-log', testtools.content_type.UTF8_TEXT,
buffer_now=False)
+ self.addCleanup(swift.reconfigure_connection_pools)
service_config = dedent("""\
[librarian_server]
- os_auth_url: http://localhost:{0}/keystone/v2.0/
- os_username: {1}
- os_password: {2}
- os_tenant_name: {3}
+ {prefix}os_auth_url: http://localhost:{port}/keystone/v2.0/
+ {prefix}os_username: {username}
+ {prefix}os_password: {password}
+ {prefix}os_tenant_name: {tenant_name}
""".format(
- self.daemon_port, fakeswift.DEFAULT_USERNAME,
- fakeswift.DEFAULT_PASSWORD, fakeswift.DEFAULT_TENANT_NAME))
+ prefix=('old_' if self.old_instance else ''),
+ port=self.daemon_port,
+ username=fakeswift.DEFAULT_USERNAME,
+ password=fakeswift.DEFAULT_PASSWORD,
+ tenant_name=fakeswift.DEFAULT_TENANT_NAME))
BaseLayer.config_fixture.add_section(service_config)
config.reloadConfig()
self.addCleanup(config.reloadConfig)
self.addCleanup(
BaseLayer.config_fixture.remove_section, service_config)
- assert config.librarian_server.os_tenant_name == 'test'
+ assert self._getConfig('os_tenant_name') == 'test'
+ swift.reconfigure_connection_pools()
def setUpRoot(self):
# Create a root directory.
@@ -88,11 +103,11 @@ class SwiftFixture(TacTestFixture):
def connect(self, **kwargs):
"""Return a valid connection to our mock Swift"""
connection_kwargs = {
- "authurl": config.librarian_server.os_auth_url,
- "auth_version": config.librarian_server.os_auth_version,
- "tenant_name": config.librarian_server.os_tenant_name,
- "user": config.librarian_server.os_username,
- "key": config.librarian_server.os_password,
+ "authurl": self._getConfig("os_auth_url"),
+ "auth_version": self._getConfig("os_auth_version"),
+ "tenant_name": self._getConfig("os_tenant_name"),
+ "user": self._getConfig("os_username"),
+ "key": self._getConfig("os_password"),
"retries": 0,
"insecure": True,
}
diff --git a/lib/lp/testing/swift/tests/test_fixture.py b/lib/lp/testing/swift/tests/test_fixture.py
index 730de53..c3cc01c 100644
--- a/lib/lp/testing/swift/tests/test_fixture.py
+++ b/lib/lp/testing/swift/tests/test_fixture.py
@@ -14,10 +14,12 @@ from swiftclient import client as swiftclient
from testtools.matchers import (
GreaterThan,
LessThan,
+ MatchesStructure,
Not,
)
from lp.services.config import config
+from lp.services.librarianserver import swift
from lp.testing import TestCase
from lp.testing.factory import ObjectFactory
from lp.testing.layers import BaseLayer
@@ -205,14 +207,62 @@ class TestSwiftFixture(TestCase):
self.assertEqual(body, b"0" * size)
def test_env(self):
- self.assertEqual(
- fakeswift.DEFAULT_USERNAME, config.librarian_server.os_username)
- self.assertEqual(
- fakeswift.DEFAULT_PASSWORD, config.librarian_server.os_password)
- self.assertEqual(
- 'http://localhost:{0}/keystone/v2.0/'.format(
+ self.assertThat(config.librarian_server, MatchesStructure.byEquality(
+ os_auth_url='http://localhost:{0}/keystone/v2.0/'.format(
self.swift_fixture.daemon_port),
- config.librarian_server.os_auth_url)
- self.assertEqual(
- fakeswift.DEFAULT_TENANT_NAME,
- config.librarian_server.os_tenant_name)
+ os_username=fakeswift.DEFAULT_USERNAME,
+ os_password=fakeswift.DEFAULT_PASSWORD,
+ os_tenant_name=fakeswift.DEFAULT_TENANT_NAME,
+ ))
+
+ def test_old_instance_env(self):
+ old_swift_fixture = self.useFixture(SwiftFixture(old_instance=True))
+ self.assertThat(config.librarian_server, MatchesStructure.byEquality(
+ os_auth_url='http://localhost:{0}/keystone/v2.0/'.format(
+ self.swift_fixture.daemon_port),
+ os_username=fakeswift.DEFAULT_USERNAME,
+ os_password=fakeswift.DEFAULT_PASSWORD,
+ os_tenant_name=fakeswift.DEFAULT_TENANT_NAME,
+ old_os_auth_url='http://localhost:{0}/keystone/v2.0/'.format(
+ old_swift_fixture.daemon_port),
+ old_os_username=fakeswift.DEFAULT_USERNAME,
+ old_os_password=fakeswift.DEFAULT_PASSWORD,
+ old_os_tenant_name=fakeswift.DEFAULT_TENANT_NAME,
+ ))
+
+ def test_reconfigures_librarian_server(self):
+ # Fixtures providing old and new Swift instances don't interfere
+ # with each other, and they reconfigure the librarian server
+ # appropriately on setup.
+ self.assertEqual(1, len(swift.connection_pools))
+ message = b"Hello World!"
+ with swift.connection() as client:
+ cname, oname = self.makeSampleObject(
+ client, message, "text/something")
+ headers, body = client.get_object(cname, oname)
+ self.assertEqual(message, body)
+ self.useFixture(SwiftFixture(old_instance=True))
+ self.assertEqual(2, len(swift.connection_pools))
+ with swift.connection() as client:
+ headers, body = client.get_object(cname, oname)
+ self.assertEqual(message, body)
+ with swift.connection(swift.connection_pools[0]) as old_client:
+ exc = self.assertRaises(
+ swiftclient.ClientException,
+ old_client.get_object, cname, oname)
+ self.assertEqual(404, exc.http_status)
+ old_cname, old_oname = self.makeSampleObject(
+ old_client, message, "text/something")
+ headers, body = old_client.get_object(old_cname, old_oname)
+ self.assertEqual(message, body)
+ with swift.connection(swift.connection_pools[1]) as client:
+ exc = self.assertRaises(
+ swiftclient.ClientException,
+ client.get_object, old_cname, old_oname)
+ self.assertEqual(404, exc.http_status)
+ # The last (i.e. newest) connection pool is the default.
+ with swift.connection() as client:
+ exc = self.assertRaises(
+ swiftclient.ClientException,
+ client.get_object, old_cname, old_oname)
+ self.assertEqual(404, exc.http_status)