← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~stub/launchpad/swift-librarian into lp:launchpad

 

Stuart Bishop has proposed merging lp:~stub/launchpad/swift-librarian into lp:launchpad with lp:~stub/launchpad/mock-swift as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~stub/launchpad/swift-librarian/+merge/179142

Move Librarian storage from local disk to Swift.

The two parts to this are:

    - A script to copy Librarian files from local disk into Swift, optionally removing them from local storage.

    - Modifying the Librarian to attempt to serve files from Swift, falling back to serving files from disk.

This should allow us to migrate without pain with the following steps.
    1) Schedule regular job to copy files into Swift.
    2) Turn on feature flag enabling Librarian to serve files from Swift
    3) Alter regular job to move files into Swift rather than copy.

At this point, the only Librarian files on local disk are the recently uploaded ones. We could then do further work to have uploads streamed directly into Swift, but I think it is better to not bother as the failure mode if Swift dies is better when local disk is available as a buffer, as we can still accept uploads and serve recently uploaded files.
-- 
https://code.launchpad.net/~stub/launchpad/swift-librarian/+merge/179142
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~stub/launchpad/swift-librarian into lp:launchpad.
=== added file 'cronscripts/librarian-feed-swift.py'
--- cronscripts/librarian-feed-swift.py	1970-01-01 00:00:00 +0000
+++ cronscripts/librarian-feed-swift.py	2013-08-15 13:52:53 +0000
@@ -0,0 +1,49 @@
+#!/usr/bin/python -S
+#
+# Copyright 2013 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Move files from Librarian disk storage into Swift."""
+
+__metaclass__ = type
+
+import _pythonpath
+
+from lp.services.scripts.base import LaunchpadCronScript
+from lp.services.librarianserver import swift
+
+
+class LibrarianFeedSwift(LaunchpadCronScript):
+    def add_my_options(self):
+        self.parser.add_option(
+            "-i", "--id", action="append", dest="ids", default=[],
+            metavar="CONTENT_ID", help="Migrate a single file")
+        self.parser.add_option(
+            "-r", "--remove", action="store_true", default=False,
+            help="Remove files from disk after migration (default: False)")
+        self.parser.add_option(
+            "-s", "--start", action="store", type=int, default=None,
+            dest="start", metavar="CONTENT_ID",
+            help="Migrate files starting from CONTENT_ID")
+        self.parser.add_option(
+            "-e", "--end", action="store", type=int, default=None,
+            dest="end", metavar="CONTENT_ID",
+            help="Migrate files up to and including CONTENT_ID")
+
+    def main(self):
+        if self.options.ids and (self.options.start or self.options.end):
+            self.parser.error(
+                "Cannot specify both individual file(s) and range")
+        elif self.options.ids:
+            for lfc in self.options.ids:
+                swift.to_swift(self.logger, lfc, lfc, self.options.remove)
+        else:
+            swift.to_swift(
+                self.logger, self.options.start, self.options.end,
+                self.options.remove)
+
+
+if __name__ == '__main__':
+    script = LibrarianFeedSwift(
+        'librarian-feed-swift', dbuser='librarianfeedswift')
+    script.lock_and_run(isolation='autocommit')

=== modified file 'cronscripts/librarian-gc.py'
--- cronscripts/librarian-gc.py	2013-06-20 05:50:00 +0000
+++ cronscripts/librarian-gc.py	2013-08-15 13:52:53 +0000
@@ -16,6 +16,9 @@
 
 import logging
 
+from swiftclient import client as swiftclient
+from keystoneclient.v2_0 import client as keystoneclient
+
 from lp.services.config import config
 from lp.services.database.interfaces import IStore
 from lp.services.librarian.model import LibraryFileAlias
@@ -59,6 +62,7 @@
 
     def main(self):
         librariangc.log = self.logger
+        #swiftclient.logger.setLevel(logging.INFO)
 
         if self.options.loglevel <= logging.DEBUG:
             librariangc.debug = True

=== modified file 'daemons/librarian.tac'
--- daemons/librarian.tac	2011-12-30 01:48:17 +0000
+++ daemons/librarian.tac	2013-08-15 13:52:53 +0000
@@ -34,6 +34,7 @@
 from lp.services.librarianserver.libraryprotocol import FileUploadFactory
 from lp.services.scripts import execute_zcml_for_scripts
 from lp.services.twistedsupport.loggingsupport import set_up_oops_reporting
+from lp.services.twistedsupport.features import setup_feature_controller
 
 # Connect to database
 dbconfig.override(
@@ -65,6 +66,7 @@
 # Service that announces when the daemon is ready
 readyservice.ReadyService().setServiceParent(librarianService)
 
+
 def setUpListener(uploadPort, webPort, restricted):
     """Set up a librarian listener on the given ports.
 
@@ -107,6 +109,10 @@
 logfile = options.get("logfile")
 set_up_oops_reporting('librarian', 'librarian', logfile)
 
+# Allow use of feature flags.
+setup_feature_controller('librarian')
+
+
 # Setup a signal handler to dump the process' memory upon 'kill -44'.
 def sigdumpmem_handler(signum, frame):
     scanner.dump_all_objects(DUMP_FILE)

=== modified file 'database/schema/security.cfg'
--- database/schema/security.cfg	2013-07-26 11:18:27 +0000
+++ database/schema/security.cfg	2013-08-15 13:52:53 +0000
@@ -394,6 +394,12 @@
 public.libraryfilecontent               = SELECT, INSERT
 type=user
 
+[librarianfeedswift]
+groups=script
+public.libraryfilealias                 = SELECT
+public.libraryfilecontent               = SELECT
+type=user
+
 [librarianlogparser]
 groups=script
 public.country                          = SELECT

=== modified file 'lib/lp/services/features/__init__.py'
--- lib/lp/services/features/__init__.py	2012-07-30 12:11:31 +0000
+++ lib/lp/services/features/__init__.py	2013-08-15 13:52:53 +0000
@@ -159,7 +159,7 @@
     from lp.services.features.testing import FeatureFixture
 
     def setUp(self):
-        self.useFixture(FeatureFixture({'myflag', 'on'}))
+        self.useFixture(FeatureFixture({'myflag': 'on'}))
 
 
 You can also use the fixture as a context manager::

=== modified file 'lib/lp/services/features/flags.py'
--- lib/lp/services/features/flags.py	2013-05-22 09:48:07 +0000
+++ lib/lp/services/features/flags.py	2013-08-15 13:52:53 +0000
@@ -209,6 +209,18 @@
      '',
      '',
      ''),
+    ('twisted.flags.refresh',
+     'float',
+     'Number of seconds between feature flag refreshes.',
+     '30',
+     '',
+     ''),
+    ('librarian.swift.enabled',
+     'boolean',
+     'If true, attempt to serve files from Swift.',
+     'disabled',
+     '',
+     ''),
     ])
 
 # The set of all flag names that are documented.

=== modified file 'lib/lp/services/features/testing.py'
--- lib/lp/services/features/testing.py	2011-12-19 23:38:16 +0000
+++ lib/lp/services/features/testing.py	2013-08-15 13:52:53 +0000
@@ -9,6 +9,7 @@
 
 from fixtures import Fixture
 from lazr.restful.utils import get_current_browser_request
+import psycopg2
 
 from lp.services.features import (
     get_relevant_feature_controller,
@@ -20,6 +21,23 @@
     StormFeatureRuleSource,
     )
 from lp.services.features.scopes import ScopesFromRequest
+from lp.testing.dbuser import dbuser
+
+
+def dbadmin(func):
+    """Decorate a function to automatically reattempt with admin db perms.
+
+    We don't just automatically switch to the admin user as this
+    implicitly commits the transaction, and we want to avoid unnecessary
+    commits to avoid breaking database setup optimizations.
+    """
+    def dbadmin_retry(*args, **kw):
+        try:
+            return func(*args, **kw)
+        except psycopg2.ProgrammingError:
+            with dbuser('testadmin'):
+                return func(*args, **kw)
+    return dbadmin_retry
 
 
 class FeatureFixture(Fixture):
@@ -46,7 +64,7 @@
         :param features_dict: A dictionary-like object with keys and values
             that are flag names and those flags' settings.
         :param override_scope_lookup: If non-None, an argument that takes
-            a scope name and returns True if it matches.  If not specified, 
+            a scope name and returns True if it matches.  If not specified,
             scopes are looked up from the current request.
         """
         self.desired_features = features_dict
@@ -59,8 +77,9 @@
 
         rule_source = StormFeatureRuleSource()
         self.addCleanup(
-            rule_source.setAllRules, rule_source.getAllRulesAsTuples())
-        rule_source.setAllRules(self.makeNewRules())
+            dbadmin(rule_source.setAllRules),
+            dbadmin(rule_source.getAllRulesAsTuples)())
+        dbadmin(rule_source.setAllRules)(self.makeNewRules())
 
         original_controller = get_relevant_feature_controller()
 
@@ -74,6 +93,7 @@
             FeatureController(scope_lookup, rule_source))
         self.addCleanup(install_feature_controller, original_controller)
 
+    @dbadmin
     def makeNewRules(self):
         """Make a set of new feature flag rules."""
         # Create a list of the new rules. Note that rules with a None

=== modified file 'lib/lp/services/librarianserver/librariangc.py'
--- lib/lp/services/librarianserver/librariangc.py	2012-12-20 07:44:56 +0000
+++ lib/lp/services/librarianserver/librariangc.py	2013-08-15 13:52:53 +0000
@@ -15,6 +15,8 @@
 import sys
 from time import time
 
+import iso8601
+from swiftclient import client as swiftclient
 from zope.interface import implements
 
 from lp.services.config import config
@@ -23,9 +25,10 @@
     listReferences,
     quoteIdentifier,
     )
+from lp.services.features import getFeatureFlag
+from lp.services.librarianserver import swift
 from lp.services.librarianserver.storage import (
     _relFileLocation as relative_file_path,
-    _sameFile,
     )
 from lp.services.looptuner import (
     DBLoopTuner,
@@ -37,6 +40,70 @@
 debug = False
 
 
+STREAM_CHUNK_SIZE = 64 * 1024
+
+
+def file_exists(content_id):
+    """True if the file exists either on disk or in Swift.
+
+    Swift is only checked if the librarian.swift.enabled feature flag
+    is set.
+    """
+    swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
+    if swift_enabled:
+        swift_connection = swift.connection_pool.get()
+        container, name = swift.swift_location(content_id)
+        try:
+            swift_connection.head_object(container, name)
+            return True
+        except swiftclient.ClientException as x:
+            if x.http_status != 404:
+                raise
+            swift.connection_pool.put(swift_connection)
+    return os.path.exists(get_file_path(content_id))
+
+
+def _utcnow():
+    # Wrapper that is replaced in the test suite.
+    return datetime.utcnow()
+
+
+def open_stream(content_id):
+    """Return an open file for the given content_id.
+
+    Returns None if the file cannot be found.
+    """
+    swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
+    if swift_enabled:
+        try:
+            swift_connection = swift.connection_pool.get()
+            container, name = swift.swift_location(content_id)
+            chunks = swift_connection.get_object(
+                container, name, resp_chunk_size=STREAM_CHUNK_SIZE)[1]
+            return swift.SwiftStream(swift_connection, chunks)
+        except swiftclient.ClientException as x:
+            if x.http_status != 404:
+                raise
+    path = get_file_path(content_id)
+    if os.path.exists(path):
+        return open(path, 'rb')
+
+    return None  # File not found.
+
+
+def same_file(content_id_1, content_id_2):
+    file1 = open_stream(content_id_1)
+    file2 = open_stream(content_id_2)
+
+    chunks_iter = iter(
+        lambda: (file1.read(STREAM_CHUNK_SIZE), file2.read(STREAM_CHUNK_SIZE)),
+        ('', ''))
+    for chunk1, chunk2 in chunks_iter:
+        if chunk1 != chunk2:
+            return False
+    return True
+
+
 def confirm_no_clock_skew(con):
     """Raise an exception if there is significant clock skew between the
     database and this machine.
@@ -47,7 +114,7 @@
     cur = con.cursor()
     cur.execute("SELECT CURRENT_TIMESTAMP AT TIME ZONE 'UTC'")
     db_now = cur.fetchone()[0]
-    local_now = datetime.utcnow()
+    local_now = _utcnow()
     five_minutes = timedelta(minutes=5)
 
     if -five_minutes < local_now - db_now < five_minutes:
@@ -180,18 +247,13 @@
         # be more common because database records has been synced from
         # production but the actual librarian contents has not.
         dupe1_id = dupes[0]
-        dupe1_path = get_file_path(dupe1_id)
-        if not os.path.exists(dupe1_path):
+        if not file_exists(dupe1_id):
             if config.instance_name == 'staging':
                 log.debug(
-                        "LibraryFileContent %d data is missing (%s)",
-                        dupe1_id, dupe1_path
-                        )
+                        "LibraryFileContent %d data is missing", dupe1_id)
             else:
                 log.warning(
-                        "LibraryFileContent %d data is missing (%s)",
-                        dupe1_id, dupe1_path
-                        )
+                        "LibraryFileContent %d data is missing", dupe1_id)
             continue
 
         # Do a manual check that they really are identical, because we
@@ -200,10 +262,8 @@
         # with an identical filesize to an existing file. Which is pretty
         # unlikely. Where did I leave my tin foil hat?
         for dupe2_id in (dupe for dupe in dupes[1:]):
-            dupe2_path = get_file_path(dupe2_id)
             # Check paths exist, because on staging they may not!
-            if (os.path.exists(dupe2_path)
-                and not _sameFile(dupe1_path, dupe2_path)):
+            if (file_exists(dupe2_id) and not same_file(dupe1_id, dupe2_id)):
                 log.error(
                         "SHA-1 collision found. LibraryFileContent %d and "
                         "%d have the same SHA1 and filesize, but are not "
@@ -412,12 +472,13 @@
     not referenced by any LibraryFileAlias entries.
 
     Note that a LibraryFileContent can only be accessed through a
-    LibraryFileAlias, so all entries in this state are garbage no matter
-    what their expires flag says.
+    LibraryFileAlias, so all entries in this state are garbage.
     """
     implements(ITunableLoop)
 
     def __init__(self, con):
+        self.swift_enabled = getFeatureFlag(
+            'librarian.swift.enabled') or False
         self.con = con
         self.index = 1
         self.total_deleted = 0
@@ -477,21 +538,38 @@
             WHERE id BETWEEN %s AND %s
             """, (self.index, self.index + chunksize - 1))
         for content_id in (row[0] for row in cur.fetchall()):
-            # Remove the file from disk, if it hasn't already been
+            removed = []
+
+            # Remove the file from disk, if it hasn't already been.
             path = get_file_path(content_id)
             try:
                 os.unlink(path)
+                removed.append('filesystem')
             except OSError as e:
                 if e.errno != errno.ENOENT:
                     raise
-                if config.librarian_server.upstream_host is None:
-                    # It is normal to have files in the database that
-                    # are not on disk if the Librarian has an upstream
-                    # Librarian, such as on staging. Don't annoy the
-                    # operator with noise in this case.
-                    log.info("%s already deleted", path)
-            else:
-                log.debug("Deleted %s", path)
+
+            # Remove the file from Swift, if it hasn't already been.
+            if self.swift_enabled:
+                container, name = swift.swift_location(content_id)
+                with swift.connection() as swift_connection:
+                    try:
+                        swift_connection.delete_object(container, name)
+                        removed.append('Swift')
+                    except swiftclient.ClientException as x:
+                        if x.http_status != 404:
+                            raise
+
+            if removed:
+                log.debug(
+                    "Deleted %s from %s", content_id, ' & '.join(removed))
+
+            elif config.librarian_server.upstream_host is None:
+                # It is normal to have files in the database that
+                # are not on disk if the Librarian has an upstream
+                # Librarian, such as on staging. Don't annoy the
+                # operator with noise in this case.
+                log.info("%s already deleted", path)
         self.con.rollback()
 
         self.index += chunksize
@@ -504,6 +582,13 @@
 
 
 def delete_unwanted_files(con):
+    delete_unwanted_disk_files(con)
+    swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
+    if swift_enabled:
+        delete_unwanted_swift_files(con)
+
+
+def delete_unwanted_disk_files(con):
     """Delete files found on disk that have no corresponding record in the
     database.
 
@@ -511,6 +596,8 @@
     to avoid deleting files that have just been uploaded but have yet to have
     the database records committed.
     """
+    swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
+
     cur = con.cursor()
 
     # Calculate all stored LibraryFileContent ids that we want to keep.
@@ -585,6 +672,7 @@
                 next_wanted_content_id = get_next_wanted_content_id()
 
                 if (config.librarian_server.upstream_host is None
+                        and not swift_enabled  # Maybe the file is in Swift.
                         and next_wanted_content_id is not None
                         and next_wanted_content_id < content_id):
                     log.error(
@@ -610,16 +698,114 @@
     # should exist but we didn't find on disk.
     if next_wanted_content_id == content_id:
         next_wanted_content_id = get_next_wanted_content_id()
+    if not swift_enabled:
+        while next_wanted_content_id is not None:
+            log.error(
+                "LibraryFileContent %d exists in the database but "
+                "was not found on disk." % next_wanted_content_id)
+            next_wanted_content_id = get_next_wanted_content_id()
+
+    log.info(
+            "Deleted %d files from disk that where no longer referenced "
+            "in the db" % removed_count
+            )
+
+
+def swift_files():
+    """Generate the (container, name) of all files stored in Swift.
+
+    Results are yielded in numerical order.
+    """
+    with swift.connection() as swift_connection:
+        try:
+            container_num = 0
+            while True:
+                # We generate the container names, rather than query the
+                # server, because the mock Swift implementation doesn't
+                # support that operation.
+                container = swift.SWIFT_CONTAINER_PREFIX + str(container_num)
+                names = sorted(
+                    swift_connection.get_container(
+                        container, full_listing=True)[1],
+                    key=lambda x: int(x['name']))
+                for name in names:
+                    yield (container, name)
+                container_num += 1
+        except swiftclient.ClientException as x:
+            if x.http_status == 404:
+                return
+            raise
+
+
+def delete_unwanted_swift_files(con):
+    """Delete files found in Swift that have no corresponding db record."""
+    assert getFeatureFlag('librarian.swift.enabled')
+
+    cur = con.cursor()
+
+    # Calculate all stored LibraryFileContent ids that we want to keep.
+    # Results are ordered so we don't have to suck them all in at once.
+    cur.execute("""
+        SELECT id FROM LibraryFileContent ORDER BY id
+        """)
+
+    def get_next_wanted_content_id():
+        result = cur.fetchone()
+        if result is None:
+            return None
+        else:
+            return result[0]
+
+    removed_count = 0
+    content_id = next_wanted_content_id = -1
+
+    for container, obj in swift_files():
+        name = obj['name']
+        content_id = int(name)
+
+        while (next_wanted_content_id is not None
+            and content_id > next_wanted_content_id):
+
+            next_wanted_content_id = get_next_wanted_content_id()
+
+            if (config.librarian_server.upstream_host is None
+                    and next_wanted_content_id is not None
+                    and next_wanted_content_id < content_id
+                    and not os.path.exists(
+                        get_file_path(next_wanted_content_id))):
+                log.error(
+                    "LibraryFileContent %d exists in the database but "
+                    "was not found on disk nor in Swift."
+                    % next_wanted_content_id)
+
+        file_wanted = (
+            next_wanted_content_id is not None
+            and next_wanted_content_id == content_id)
+
+        if not file_wanted:
+            mod_time = iso8601.parse_date(
+                obj['last_modified']).replace(tzinfo=None)
+            if mod_time > _utcnow() - timedelta(days=1):
+                log.debug(
+                    "File %d not removed - created too recently", content_id)
+            else:
+                with swift.connection() as swift_connection:
+                    swift_connection.delete_object(container, name)
+                log.debug(
+                    'Deleted ({}, {}) from Swift'.format(container, name))
+                removed_count += 1
+
+    if next_wanted_content_id == content_id:
+        next_wanted_content_id = get_next_wanted_content_id()
     while next_wanted_content_id is not None:
         log.error(
-            "LibraryFileContent %d exists in the database but "
-            "was not found on disk." % next_wanted_content_id)
+            "LibraryFileContent {} exists in the database but was not "
+            "found in Swift.".format(next_wanted_content_id))
         next_wanted_content_id = get_next_wanted_content_id()
 
     log.info(
-            "Deleted %d files from disk that where no longer referenced "
-            "in the db" % removed_count
-            )
+        "Deleted {} files from Swift that where no longer referenced"
+        "in the db".format(removed_count))
 
 
 def get_file_path(content_id):

=== modified file 'lib/lp/services/librarianserver/storage.py'
--- lib/lp/services/librarianserver/storage.py	2013-06-20 05:50:00 +0000
+++ lib/lp/services/librarianserver/storage.py	2013-08-15 13:52:53 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2010 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2013 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -9,11 +9,18 @@
 import shutil
 import tempfile
 
+from swiftclient import client as swiftclient
+from twisted.python import log
+from twisted.internet import defer
+from twisted.internet.threads import deferToThread
+from twisted.web.static import StaticProducer
+
 from lp.registry.model.product import Product
 from lp.services.config import dbconfig
 from lp.services.database import write_transaction
 from lp.services.database.interfaces import IStore
 from lp.services.database.postgresql import ConnectionString
+from lp.services.librarianserver import swift
 
 
 __all__ = [
@@ -67,6 +74,32 @@
     def hasFile(self, fileid):
         return os.access(self._fileLocation(fileid), os.F_OK)
 
+    CHUNK_SIZE = StaticProducer.bufferSize
+
+    @defer.inlineCallbacks
+    def open(self, fileid):
+        # First, try and stream the file from Swift.
+        container, name = swift.swift_location(fileid)
+        swift_connection = swift.connection_pool.get()
+        try:
+            headers, chunks = yield deferToThread(
+                swift_connection.get_object,
+                container, name, resp_chunk_size=self.CHUNK_SIZE)
+            swift_stream = TxSwiftStream(swift_connection, chunks)
+            defer.returnValue(swift_stream)
+        except swiftclient.ClientException as x:
+            if x.http_status != 404:
+                log.err(x)
+        except Exception as x:
+            log.err(x)
+
+        # If Swift failed, for any reason, try and stream the data from
+        # disk. In particular, files cannot be found in Swift until
+        # librarian-feed-swift.py has put them in there.
+        path = self._fileLocation(fileid)
+        if os.path.exists(path):
+            defer.returnValue(open(path, 'rb'))
+
     def _fileLocation(self, fileid):
         return os.path.join(self.directory, _relFileLocation(str(fileid)))
 
@@ -77,6 +110,41 @@
         return self.library.getAlias(aliasid, token, path)
 
 
+class TxSwiftStream(swift.SwiftStream):
+    @defer.inlineCallbacks
+    def read(self, size):
+        if self.closed:
+            raise ValueError('I/O operation on closed file')
+
+        if self._swift_connection is None:
+            defer.returnValue('')  # EOF already reached, connection returned.
+
+        if size == 0:
+            defer.returnValue('')
+
+        return_chunks = []
+        return_size = 0
+
+        while return_size < size:
+            if not self._chunk:
+                self._chunk = yield deferToThread(self._next_chunk)
+                if not self._chunk:
+                    # If we have drained the data successfully,
+                    # the connection can be reused saving on auth
+                    # handshakes.
+                    swift.connection_pool.put(self._swift_connection)
+                    self._swift_connection = None
+                    self._chunks = None
+                    break
+            split = size - return_size
+            return_chunks.append(self._chunk[:split])
+            self._chunk = self._chunk[split:]
+            return_size += len(return_chunks[-1])
+
+        self._offset += return_size
+        defer.returnValue(''.join(return_chunks))
+
+
 class LibraryFileUpload(object):
     """A file upload from a client."""
     srcDigest = None

=== added file 'lib/lp/services/librarianserver/swift.py'
--- lib/lp/services/librarianserver/swift.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/librarianserver/swift.py	2013-08-15 13:52:53 +0000
@@ -0,0 +1,253 @@
+# Copyright 2013 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Move files from Librarian disk storage into Swift."""
+
+__metaclass__ = type
+__all__ = [
+    'to_swift', 'filesystem_path', 'swift_location',
+    'connection', 'connection_pool', 'SWIFT_CONTAINER_PREFIX',
+    ]
+
+from contextlib import contextmanager
+import os.path
+import sys
+
+from swiftclient import client as swiftclient
+
+from lp.services.config import config
+
+
+SWIFT_CONTAINER_PREFIX = 'librarian_'
+
+
+def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove=False):
+    '''Copy a range of Librarian files from disk into Swift.
+
+    start and end identify the range of LibraryFileContent.id to
+    migrate (inclusive).
+
+    If remove is True, files are removed from disk after being copied into
+    Swift.
+    '''
+    swift_connection = connection_pool.get()
+    fs_root = os.path.abspath(config.librarian_server.root)
+
+    if start_lfc_id is None:
+        start_lfc_id = 1
+    if end_lfc_id is None:
+        end_lfc_id = sys.maxint
+        end_str = 'MAXINT'
+    else:
+        end_str = str(end_lfc_id)
+
+    log.info("Walking disk store {} from {} to {}, inclusive".format(
+        fs_root, start_lfc_id, end_str))
+
+    start_fs_path = filesystem_path(start_lfc_id)
+    end_fs_path = filesystem_path(end_lfc_id)
+
+    # Walk the Librarian on disk file store, searching for matching
+    # files that may need to be copied into Swift. We need to follow
+    # symlinks as they are being used span disk partitions.
+    for dirpath, dirnames, filenames in os.walk(fs_root, followlinks=True):
+
+        # Don't recurse if we know this directory contains no matching
+        # files.
+        if (start_fs_path[:len(dirpath)] > dirpath
+            or end_fs_path[:len(dirpath)] < dirpath):
+            dirnames[:] = []
+            continue
+
+        log.debug('Scanning {} for matching files'.format(dirpath))
+
+        for filename in sorted(filenames):
+            fs_path = os.path.join(dirpath, filename)
+            if fs_path < start_fs_path:
+                continue
+            if fs_path > end_fs_path:
+                break
+            rel_fs_path = fs_path[len(fs_root) + 1:]
+
+            # Reverse engineer the LibraryFileContent.id from the
+            # file's path. Warn about and skip bad filenames.
+            hex_lfc = ''.join(rel_fs_path.split('/'))
+            if len(hex_lfc) != 8:
+                log.warning(
+                    'Filename length fail, skipping {}'.format(fs_path))
+                continue
+            try:
+                lfc = int(hex_lfc, 16)
+            except ValueError:
+                log.warning('Invalid hex fail, skipping {}'.format(fs_path))
+                continue
+
+            log.debug('Found {} ({})'.format(lfc, filename))
+
+            container, obj_name = swift_location(lfc)
+
+            try:
+                swift_connection.head_container(container)
+                log.debug2('{} container already exists'.format(container))
+            except swiftclient.ClientException as x:
+                if x.http_status != 404:
+                    raise
+                log.info('Creating {} container'.format(container))
+                swift_connection.put_container(container)
+
+            try:
+                swift_connection.head_object(container, obj_name)
+                log.debug(
+                    "{} already exists in Swift({}, {})".format(
+                        lfc, container, obj_name))
+            except swiftclient.ClientException as x:
+                if x.http_status != 404:
+                    raise
+                log.info(
+                    'Putting {} into Swift ({}, {})'.format(
+                        lfc, container, obj_name))
+                swift_connection.put_object(
+                    container, obj_name,
+                    open(fs_path, 'rb'), os.path.getsize(fs_path))
+
+            if remove:
+                os.unlink(fs_path)
+
+
+def swift_location(lfc_id):
+    '''Return the (container, obj_name) used to store a file.
+
+    Per https://answers.launchpad.net/swift/+question/181977, we can't
+    simply stuff everything into one container.
+    '''
+    assert isinstance(lfc_id, (int, long)), 'Not a LibraryFileContent.id'
+
+    # Don't change this unless you are also going to rebuild the Swift
+    # storage, as objects will no longer be found in the expected
+    # container. This value and the container prefix are deliberatly
+    # hard coded to avoid cockups with values specified in config files.
+    max_objects_per_container = 1000000
+
+    container_num = lfc_id // max_objects_per_container
+
+    return (SWIFT_CONTAINER_PREFIX + str(container_num), str(lfc_id))
+
+
+def filesystem_path(lfc_id):
+    from lp.services.librarianserver.storage import _relFileLocation
+    return os.path.join(
+        config.librarian_server.root, _relFileLocation(lfc_id))
+
+
+class SwiftStream:
+    def __init__(self, swift_connection, chunks):
+        self._swift_connection = swift_connection
+        self._chunks = chunks  # Generator from swiftclient.get_object()
+
+        self.closed = False
+        self._offset = 0
+        self._chunk = None
+
+    def read(self, size):
+        if self.closed:
+            raise ValueError('I/O operation on closed file')
+
+        if self._swift_connection is None:
+            return ''
+
+        if size == 0:
+            return ''
+
+        return_chunks = []
+        return_size = 0
+
+        while return_size < size:
+            if not self._chunk:
+                self._chunk = self._next_chunk()
+                if not self._chunk:
+                    # If we have drained the data successfully,
+                    # the connection can be reused saving on auth
+                    # handshakes.
+                    connection_pool.put(self._swift_connection)
+                    self._swift_connection = None
+                    self._chunks = None
+                    break
+            split = size - return_size
+            return_chunks.append(self._chunk[:split])
+            self._chunk = self._chunk[split:]
+            return_size += len(return_chunks[-1])
+
+        self._offset += return_size
+        return ''.join(return_chunks)
+
+    def _next_chunk(self):
+        try:
+            return self._chunks.next()
+        except StopIteration:
+            return None
+
+    def close(self):
+        self.closed = True
+        self._swift_connection = None
+
+    def seek(self, offset):
+        if offset < self._offset:
+            raise NotImplementedError('rewind')  # Rewind not supported
+        else:
+            self.read(offset - self._offset)
+
+    def tell(self):
+        return self._offset
+
+
+class ConnectionPool:
+    MAX_POOL_SIZE = 10
+
+    def __init__(self):
+        self.clear()
+
+    def clear(self):
+        self._pool = []
+
+    def get(self):
+        '''Return a conection from the pool, or a fresh connection.'''
+        try:
+            return self._pool.pop()
+        except IndexError:
+            return self._new_connection()
+
+    def put(self, swift_connection):
+        '''Put a connection back in the pool for reuse.
+
+        Only call this if the connection is in a usable state. If an
+        exception has been raised (apart from a 404), don't trust the
+        swift_connection and throw it away.
+        '''
+        if swift_connection not in self._pool:
+            self._pool.append(swift_connection)
+            while len(self._pool) > self.MAX_POOL_SIZE:
+                self._pool.pop(0)
+
+    def _new_connection(self):
+        return swiftclient.Connection(
+            authurl=os.environ.get('OS_AUTH_URL', None),
+            user=os.environ.get('OS_USERNAME', None),
+            key=os.environ.get('OS_PASSWORD', None),
+            tenant_name=os.environ.get('OS_TENANT_NAME', None),
+            auth_version='2.0',
+            )
+
+
+connection_pool = ConnectionPool()
+
+
+@contextmanager
+def connection():
+    global connection_pool
+    con = connection_pool.get()
+    yield con
+
+    # We can safely put the connection back in the pool, as this code is
+    # only reached if the contextmanager block exited normally (no
+    # exception raised).
+    connection_pool.put(con)

=== modified file 'lib/lp/services/librarianserver/tests/test_gc.py'
--- lib/lp/services/librarianserver/tests/test_gc.py	2013-06-14 04:51:40 +0000
+++ lib/lp/services/librarianserver/tests/test_gc.py	2013-08-15 13:52:53 +0000
@@ -6,7 +6,7 @@
 __metaclass__ = type
 
 from cStringIO import StringIO
-from datetime import timedelta
+from datetime import datetime, timedelta
 import os
 import shutil
 from subprocess import (
@@ -18,6 +18,7 @@
 import tempfile
 
 from sqlobject import SQLObjectNotFound
+from swiftclient import client as swiftclient
 import transaction
 
 from lp.services.config import config
@@ -31,19 +32,30 @@
     LibraryFileAlias,
     LibraryFileContent,
     )
-from lp.services.librarianserver import librariangc
+from lp.services.librarianserver import (
+    librariangc,
+    swift,
+    )
 from lp.services.log.logger import BufferLogger
+from lp.services.features.testing import FeatureFixture
 from lp.services.utils import utc_now
 from lp.testing import TestCase
 from lp.testing.dbuser import switch_dbuser
-from lp.testing.layers import LaunchpadZopelessLayer
-
-
-class TestLibrarianGarbageCollection(TestCase):
+from lp.testing.layers import (
+    LaunchpadZopelessLayer,
+    LibrarianLayer,
+    )
+from lp.testing.swift.fixture import SwiftFixture
+
+
+class TestLibrarianGarbageCollectionBase:
+    """Test garbage collection code that operates differently with
+    Swift enabled. These tests need to be run under both environments.
+    """
     layer = LaunchpadZopelessLayer
 
     def setUp(self):
-        super(TestLibrarianGarbageCollection, self).setUp()
+        super(TestLibrarianGarbageCollectionBase, self).setUp()
         self.client = LibrarianClient()
         self.patch(librariangc, 'log', BufferLogger())
 
@@ -65,12 +77,6 @@
         switch_dbuser(config.librarian_gc.dbuser)
         self.ztm = self.layer.txn
 
-        # Make sure the files exist. We do this in setup, because we
-        # need to use the get_file_path method later in the setup and we
-        # want to be sure it is working correctly.
-        path = librariangc.get_file_path(self.f1_id)
-        self.failUnless(os.path.exists(path), "Librarian uploads failed")
-
         # Make sure that every file the database knows about exists on disk.
         # We manually remove them for tests that need to cope with missing
         # library items.
@@ -93,7 +99,7 @@
         self.con.rollback()
         self.con.close()
         del self.con
-        super(TestLibrarianGarbageCollection, self).tearDown()
+        super(TestLibrarianGarbageCollectionBase, self).tearDown()
 
     def _makeDupes(self):
         """Create two duplicate LibraryFileContent entries with one
@@ -133,6 +139,11 @@
 
         return f1_id, f2_id
 
+    def test_files_exist(self):
+        # Confirm the files we expect created by the test harness
+        # actually exist.
+        self.failUnless(self.file_exists(self.f1_id))
+
     def test_MergeDuplicates(self):
         # Merge the duplicates
         librariangc.merge_duplicates(self.con)
@@ -324,14 +335,13 @@
         self.ztm.abort()
 
         # Make sure the file exists on disk
-        path = librariangc.get_file_path(unreferenced_id)
-        self.failUnless(os.path.exists(path))
+        self.failUnless(self.file_exists(unreferenced_id))
 
         # Delete unreferenced content
         librariangc.delete_unreferenced_content(self.con)
 
         # Make sure the file is gone
-        self.failIf(os.path.exists(path))
+        self.failIf(self.file_exists(unreferenced_id))
 
         # delete_unreferenced_content should have committed
         self.ztm.begin()
@@ -386,18 +396,17 @@
         self.ztm.abort()
 
         # Make sure the file exists on disk
-        path = librariangc.get_file_path(unreferenced_id)
-        self.failUnless(os.path.exists(path))
+        self.failUnless(self.file_exists(unreferenced_id))
 
         # Remove the file from disk
-        os.unlink(path)
-        self.failIf(os.path.exists(path))
+        self.remove_file(unreferenced_id)
+        self.failIf(self.file_exists(unreferenced_id))
 
         # Delete unreferenced content
         librariangc.delete_unreferenced_content(self.con)
 
         # Make sure the file is gone
-        self.failIf(os.path.exists(path))
+        self.failIf(self.file_exists(unreferenced_id))
 
         # delete_unreferenced_content should have committed
         self.ztm.begin()
@@ -440,8 +449,7 @@
                 """, (content_id,))
         self.ztm.commit()
 
-        path = librariangc.get_file_path(content_id)
-        self.failUnless(os.path.exists(path))
+        self.failUnless(self.file_exists(content_id))
 
         # Ensure delete_unreferenced_files does not remove the file, because
         # it will have just been created (has a recent date_created). There
@@ -449,22 +457,28 @@
         # bothering to remove the file to avoid the race condition where the
         # garbage collector is run whilst a file is being uploaded.
         librariangc.delete_unwanted_files(self.con)
-        self.failUnless(os.path.exists(path))
+        self.failUnless(self.file_exists(content_id))
 
         # To test removal does occur when we want it to, we need to trick
         # the garbage collector into thinking it is tomorrow.
         org_time = librariangc.time
+        org_utcnow = librariangc._utcnow
 
         def tomorrow_time():
             return org_time() + 24 * 60 * 60 + 1
 
+        def tomorrow_utcnow():
+            return datetime.utcnow() + timedelta(days=1, seconds=1)
+
         try:
             librariangc.time = tomorrow_time
+            librariangc._utcnow = tomorrow_utcnow
             librariangc.delete_unwanted_files(self.con)
         finally:
             librariangc.time = org_time
+            librariangc._utcnow = org_utcnow
 
-        self.failIf(os.path.exists(path))
+        self.failIf(self.file_exists(content_id))
 
         # Make sure nothing else has been removed from disk
         self.ztm.begin()
@@ -473,8 +487,109 @@
                 SELECT id FROM LibraryFileContent
                 """)
         for content_id in (row[0] for row in cur.fetchall()):
-            path = librariangc.get_file_path(content_id)
-            self.failUnless(os.path.exists(path))
+            self.failUnless(self.file_exists(content_id))
+
+    def test_delete_unwanted_files_bug437084(self):
+        # There was a bug where delete_unwanted_files() would die
+        # if the last file found on disk was unwanted.
+        switch_dbuser('testadmin')
+        content = 'foo'
+        self.client.addFile(
+            'foo.txt', len(content), StringIO(content), 'text/plain')
+        # Roll back the database changes, leaving the file on disk.
+        transaction.abort()
+
+        switch_dbuser(config.librarian_gc.dbuser)
+
+        # This should cope.
+        librariangc.delete_unwanted_files(self.con)
+
+    def test_delete_unwanted_files_follows_symlinks(self):
+        # In production, our tree has symlinks in it now.  We need to be able
+        # to cope.
+        # First, let's make sure we have some trash.
+        switch_dbuser('testadmin')
+        content = 'foo'
+        self.client.addFile(
+            'foo.txt', len(content), StringIO(content), 'text/plain')
+        # Roll back the database changes, leaving the file on disk.
+        transaction.abort()
+
+        switch_dbuser(config.librarian_gc.dbuser)
+
+        # Now, we will move the directory containing the trash somewhere else
+        # and make a symlink to it.
+        original = os.path.join(config.librarian_server.root, '00', '00')
+        newdir = tempfile.mkdtemp()
+        alt = os.path.join(newdir, '00')
+        shutil.move(original, alt)
+        os.symlink(alt, original)
+
+        # Now we will do our thing.  This is the actual test.  It used to
+        # fail.
+        librariangc.delete_unwanted_files(self.con)
+
+        # Clean up.
+        os.remove(original)
+        shutil.move(alt, original)
+        shutil.rmtree(newdir)
+
+    def test_cronscript(self):
+        script_path = os.path.join(
+                config.root, 'cronscripts', 'librarian-gc.py'
+                )
+        cmd = [sys.executable, script_path, '-q']
+        process = Popen(cmd, stdout=PIPE, stderr=STDOUT, stdin=PIPE)
+        (script_output, _empty) = process.communicate()
+        self.failUnlessEqual(
+            process.returncode, 0, 'Error: %s' % script_output)
+        self.failUnlessEqual(script_output, '')
+
+        # Make sure that our example files have been garbage collected
+        self.ztm.begin()
+        self.assertRaises(SQLObjectNotFound, LibraryFileAlias.get, self.f1_id)
+        self.assertRaises(SQLObjectNotFound, LibraryFileAlias.get, self.f2_id)
+
+        # And make sure stuff that *is* referenced remains
+        LibraryFileAlias.get(2)
+        cur = cursor()
+        cur.execute("SELECT count(*) FROM LibraryFileAlias")
+        count = cur.fetchone()[0]
+        self.failIfEqual(count, 0)
+        cur.execute("SELECT count(*) FROM LibraryFileContent")
+        count = cur.fetchone()[0]
+        self.failIfEqual(count, 0)
+
+    def test_confirm_no_clock_skew(self):
+        # There should not be any clock skew when running the test suite.
+        librariangc.confirm_no_clock_skew(self.con)
+
+        # To test this function raises an excption when it should,
+        # the garbage collector into thinking it is tomorrow.
+        org_time = librariangc.time
+
+        def tomorrow_time():
+            return org_time() + 24 * 60 * 60 + 1
+
+        try:
+            librariangc.time = tomorrow_time
+            self.assertRaises(
+                Exception, librariangc.confirm_no_clock_skew, (self.con,)
+                )
+        finally:
+            librariangc.time = org_time
+
+
+class TestDiskLibrarianGarbageCollection(
+    TestLibrarianGarbageCollectionBase, TestCase):
+
+    def file_exists(self, content_id):
+        path = librariangc.get_file_path(content_id)
+        return os.path.exists(path)
+
+    def remove_file(self, content_id):
+        path = librariangc.get_file_path(content_id)
+        os.unlink(path)
 
     def test_deleteUnwantedFilesIgnoresNoise(self):
         # Directories with invalid names in the storage area are
@@ -536,95 +651,47 @@
             "WARNING Ignoring invalid directory zz",
             librariangc.log.getLogBuffer())
 
-    def test_delete_unwanted_files_bug437084(self):
-        # There was a bug where delete_unwanted_files() would die
-        # if the last file found on disk was unwanted.
-        switch_dbuser('testadmin')
-        content = 'foo'
-        self.client.addFile(
-            'foo.txt', len(content), StringIO(content), 'text/plain')
-        # Roll back the database changes, leaving the file on disk.
-        transaction.abort()
-
-        switch_dbuser(config.librarian_gc.dbuser)
-
-        # This should cope.
-        librariangc.delete_unwanted_files(self.con)
-
-    def test_delete_unwanted_files_follows_symlinks(self):
-        # In production, our tree has symlinks in it now.  We need to be able
-        # to cope.
-        # First, let's make sure we have some trash.
-        switch_dbuser('testadmin')
-        content = 'foo'
-        self.client.addFile(
-            'foo.txt', len(content), StringIO(content), 'text/plain')
-        # Roll back the database changes, leaving the file on disk.
-        transaction.abort()
-
-        switch_dbuser(config.librarian_gc.dbuser)
-
-        # Now, we will move the directory containing the trash somewhere else
-        # and make a symlink to it.
-        original = os.path.join(config.librarian_server.root, '00', '00')
-        newdir = tempfile.mkdtemp()
-        alt = os.path.join(newdir, '00')
-        shutil.move(original, alt)
-        os.symlink(alt, original)
-
-        # Now we will do our thing.  This is the actual test.  It used to
-        # fail.
-        librariangc.delete_unwanted_files(self.con)
-
-        # Clean up.
-        os.remove(original)
-        shutil.move(alt, original)
-        shutil.rmtree(newdir)
-
-    def test_cronscript(self):
-        script_path = os.path.join(
-                config.root, 'cronscripts', 'librarian-gc.py'
-                )
-        cmd = [sys.executable, script_path, '-q']
-        process = Popen(cmd, stdout=PIPE, stderr=STDOUT, stdin=PIPE)
-        (script_output, _empty) = process.communicate()
-        self.failUnlessEqual(
-            process.returncode, 0, 'Error: %s' % script_output)
-        self.failUnlessEqual(script_output, '')
-
-        # Make sure that our example files have been garbage collected
-        self.ztm.begin()
-        self.assertRaises(SQLObjectNotFound, LibraryFileAlias.get, self.f1_id)
-        self.assertRaises(SQLObjectNotFound, LibraryFileAlias.get, self.f2_id)
-
-        # And make sure stuff that *is* referenced remains
-        LibraryFileAlias.get(2)
-        cur = cursor()
-        cur.execute("SELECT count(*) FROM LibraryFileAlias")
-        count = cur.fetchone()[0]
-        self.failIfEqual(count, 0)
-        cur.execute("SELECT count(*) FROM LibraryFileContent")
-        count = cur.fetchone()[0]
-        self.failIfEqual(count, 0)
-
-    def test_confirm_no_clock_skew(self):
-        # There should not be any clock skew when running the test suite.
-        librariangc.confirm_no_clock_skew(self.con)
-
-        # To test this function raises an excption when it should,
-        # the garbage collector into thinking it is tomorrow.
-        org_time = librariangc.time
-
-        def tomorrow_time():
-            return org_time() + 24 * 60 * 60 + 1
-
-        try:
-            librariangc.time = tomorrow_time
-            self.assertRaises(
-                Exception, librariangc.confirm_no_clock_skew, (self.con,)
-                )
-        finally:
-            librariangc.time = org_time
+
+class TestSwiftLibrarianGarbageCollection(
+    TestLibrarianGarbageCollectionBase, TestCase):
+    """Swift specific garbage collection tests."""
+    def setUp(self):
+        # Once we switch entirely to Swift, we can move this setup into
+        # the lp.testing.layers code and save the per-test overhead.
+
+        self.swift_fixture = self.useFixture(SwiftFixture())
+        self.addCleanup(swift.connection_pool.clear)
+
+        self.useFixture(FeatureFixture({'librarian.swift.enabled': True}))
+
+        # Restart the Librarian so it picks up the OS_* environment
+        # variables.
+        LibrarianLayer.librarian_fixture.killTac()
+        LibrarianLayer.librarian_fixture.setUp()
+
+        super(TestSwiftLibrarianGarbageCollection, self).setUp()
+
+        # Move files into Swift.
+        path = librariangc.get_file_path(self.f1_id)
+        assert os.path.exists(path), "Librarian uploads failed"
+        swift.to_swift(BufferLogger(), remove=True)
+        assert not os.path.exists(path), "to_swift failed to move files"
+
+    def file_exists(self, content_id):
+        container, name = swift.swift_location(content_id)
+        with swift.connection() as swift_connection:
+            try:
+                swift_connection.head_object(container, name)
+                return True
+            except swiftclient.ClientException as x:
+                if x.http_status == 404:
+                    return False
+                raise
+
+    def remove_file(self, content_id):
+        container, name = swift.swift_location(content_id)
+        with swift.connection() as swift_connection:
+            swift_connection.delete_object(container, name)
 
 
 class TestBlobCollection(TestCase):

=== added file 'lib/lp/services/librarianserver/tests/test_swift.py'
--- lib/lp/services/librarianserver/tests/test_swift.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/librarianserver/tests/test_swift.py	2013-08-15 13:52:53 +0000
@@ -0,0 +1,182 @@
+# Copyright 2013 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Librarian disk to Swift storage tests."""
+
+__metaclass__ = type
+
+from cStringIO import StringIO
+import os.path
+
+from mock import patch
+import transaction
+
+from lp.services.database import write_transaction
+from lp.services.database.interfaces import IStore
+from lp.services.features.testing import FeatureFixture
+from lp.services.librarian.client import LibrarianClient
+from lp.services.librarian.model import LibraryFileAlias
+from lp.services.librarianserver.storage import LibrarianStorage
+from lp.services.log.logger import BufferLogger
+from lp.testing import TestCase
+from lp.testing.layers import LaunchpadZopelessLayer, LibrarianLayer
+from lp.testing.swift.fixture import SwiftFixture
+
+from lp.services.librarianserver import swift
+
+
+class TestFeedSwift(TestCase):
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super(TestFeedSwift, self).setUp()
+        self.swift_fixture = self.useFixture(SwiftFixture())
+        self.useFixture(FeatureFixture({'librarian.swift.enabled': True}))
+        transaction.commit()
+
+        self.addCleanup(swift.connection_pool.clear)
+
+        # Restart the Librarian so it picks up the OS_* environment
+        # variables.
+        LibrarianLayer.librarian_fixture.killTac()
+        LibrarianLayer.librarian_fixture.setUp()
+
+        # Add some files.
+        self.librarian_client = LibrarianClient()
+        self.contents = [str(i) * i for i in range(1, 5)]
+        self.lfa_ids = [
+            self.add_file('file_{}'.format(i), content)
+            for content in self.contents]
+        self.lfas = [
+            IStore(LibraryFileAlias).get(LibraryFileAlias, lfa_id)
+                for lfa_id in self.lfa_ids]
+        self.lfcs = [lfa.content for lfa in self.lfas]
+
+    def tearDown(self):
+        super(TestFeedSwift, self).tearDown()
+        # Restart the Librarian so it picks up the feature flag change.
+        LibrarianLayer.librarian_fixture.killTac()
+        LibrarianLayer.librarian_fixture.setUp()
+
+    @write_transaction
+    def add_file(self, name, content, content_type='text/plain'):
+        return self.librarian_client.addFile(
+            name=name, size=len(content), file=StringIO(content),
+            contentType=content_type)
+
+    def test_copy_to_swift(self):
+        log = BufferLogger()
+
+        # Confirm that files exist on disk where we expect to find them.
+        for lfc in self.lfcs:
+            path = swift.filesystem_path(lfc.id)
+            self.assert_(os.path.exists(path))
+
+        # Copy all the files into Swift.
+        swift.to_swift(log)  # remove == False
+
+        # Confirm that files exist on disk where we expect to find them.
+        for lfc in self.lfcs:
+            path = swift.filesystem_path(lfc.id)
+            self.assert_(os.path.exists(path))
+
+        # Confirm all the files are also in Swift.
+        swift_client = self.swift_fixture.connect()
+        for lfc, contents in zip(self.lfcs, self.contents):
+            container, name = swift.swift_location(lfc.id)
+            headers, obj = swift_client.get_object(container, name)
+            self.assertEqual(contents, obj, 'Did not round trip')
+
+        # Running again does nothing, in particular does not reupload
+        # the files to Swift.
+        con_patch = patch.object(
+            swift.swiftclient.Connection, 'put_object',
+            side_effect=AssertionError('do not call'))
+        with con_patch:
+            swift.to_swift(log)  # remove == False
+
+    def test_move_to_swift(self):
+        log = BufferLogger()
+
+        # Confirm that files exist on disk where we expect to find them.
+        for lfc in self.lfcs:
+            path = swift.filesystem_path(lfc.id)
+            self.assert_(os.path.exists(path))
+
+        # Migrate all the files into Swift.
+        swift.to_swift(log, remove=True)
+
+        # Confirm that all the files have gone from disk.
+        for lfc in self.lfcs:
+            self.failIf(os.path.exists(swift.filesystem_path(lfc.id)))
+
+        # Confirm all the files are in Swift.
+        swift_client = self.swift_fixture.connect()
+        for lfc, contents in zip(self.lfcs, self.contents):
+            container, name = swift.swift_location(lfc.id)
+            headers, obj = swift_client.get_object(container, name)
+            self.assertEqual(contents, obj, 'Did not round trip')
+
+    def test_librarian_serves_from_swift(self):
+        log = BufferLogger()
+
+        # Move all the files into Swift and off the file system.
+        swift.to_swift(log, remove=True)
+
+        # Confirm we can still access the files from the Librarian.
+        for lfa_id, content in zip(self.lfa_ids, self.contents):
+            data = self.librarian_client.getFileByAlias(lfa_id).read()
+            self.assertEqual(content, data)
+
+    def test_librarian_serves_from_disk(self):
+        # Ensure the Librarian falls back to serving files from disk
+        # when they cannot be found in the Swift server. Note that other
+        # Librarian tests do not have Swift active, so this test is not
+        # redundant.
+        for lfa_id, content in zip(self.lfa_ids, self.contents):
+            data = self.librarian_client.getFileByAlias(lfa_id).read()
+            self.assertEqual(content, data)
+
+    def test_large_binary_files_from_disk(self):
+        # Generate a large blob, including null bytes for kicks.
+        size = 512 * 1024  # 512KB
+        expected_content = ''.join(chr(i % 256) for i in range(0, size))
+        lfa_id = self.add_file('hello_bigboy.xls', expected_content)
+
+        # Data round trips when served from disk.
+        lfa = self.librarian_client.getFileByAlias(lfa_id)
+        self.assertEqual(expected_content, lfa.read())
+
+    def test_large_binary_files_from_swift(self):
+        # Generate large blob, multiple of the chunk size.
+        # Including null bytes for kicks.
+        size = LibrarianStorage.CHUNK_SIZE * 50
+        self.assert_(size > 1024 * 1024)
+        expected_content = ''.join(chr(i % 256) for i in range(0, size))
+        lfa_id = self.add_file('hello_bigboy.xls', expected_content)
+
+        # This data size is a multiple of our chunk size.
+        self.assertEqual(
+            0, len(expected_content) % LibrarianStorage.CHUNK_SIZE)
+
+        # Data round trips when served from Swift.
+        swift.to_swift(BufferLogger(), remove=True)
+        lfa = self.librarian_client.getFileByAlias(lfa_id)
+        self.assertEqual(expected_content, lfa.read())
+
+    def test_large_binary_files_from_swift_offset(self):
+        # Generate large blob, but NOT a multiple of the chunk size.
+        # Including null bytes for kicks.
+        size = LibrarianStorage.CHUNK_SIZE * 50 + 1
+        self.assert_(size > 1024 * 1024)
+        expected_content = ''.join(chr(i % 256) for i in range(0, size))
+        lfa_id = self.add_file('hello_bigboy.xls', expected_content)
+
+        # This data size is NOT a multiple of our chunk size.
+        self.assertNotEqual(
+            0, len(expected_content) % LibrarianStorage.CHUNK_SIZE)
+
+        # Data round trips when served from Swift.
+        swift.to_swift(BufferLogger(), remove=True)
+        lfa = self.librarian_client.getFileByAlias(lfa_id)
+        self.assertEqual(expected_content, lfa.read())

=== modified file 'lib/lp/services/librarianserver/web.py'
--- lib/lp/services/librarianserver/web.py	2012-12-20 07:44:56 +0000
+++ lib/lp/services/librarianserver/web.py	2013-08-15 13:52:53 +0000
@@ -1,4 +1,4 @@
-# Copyright 2009-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2009-2013 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 __metaclass__ = type
@@ -8,9 +8,11 @@
 from urlparse import urlparse
 
 from storm.exceptions import DisconnectionError
+from twisted.internet import defer
 from twisted.internet.threads import deferToThread
 from twisted.python import log
 from twisted.web import (
+    http,
     proxy,
     resource,
     server,
@@ -23,6 +25,7 @@
     read_transaction,
     write_transaction,
     )
+from lp.services.features import getFeatureFlag
 from lp.services.librarian.client import url_path_quote
 from lp.services.librarian.utils import guess_librarian_encoding
 
@@ -115,11 +118,18 @@
 
         token = request.args.get('token', [None])[0]
         path = request.path
-        deferred = deferToThread(
-            self._getFileAlias, self.aliasID, token, path)
-        deferred.addCallback(
-                self._cb_getFileAlias, filename, request
-                )
+        swift_enabled = getFeatureFlag('librarian.swift.enabled') or False
+        if swift_enabled:
+            deferred = deferToThread(
+                self._getFileAlias_swift, self.aliasID, token, path)
+            deferred.addCallback(
+                self._cb_getFileAlias_swift, filename, request)
+        else:
+            deferred = deferToThread(
+                self._getFileAlias, self.aliasID, token, path)
+            deferred.addCallback(
+                    self._cb_getFileAlias, filename, request
+                    )
         deferred.addErrback(self._eb_getFileAlias)
         return util.DeferredResource(deferred)
 
@@ -132,6 +142,16 @@
         except LookupError:
             raise NotFound
 
+    @write_transaction
+    def _getFileAlias_swift(self, aliasID, token, path):
+        try:
+            alias = self.storage.getFileAlias(aliasID, token, path)
+            return (alias.contentID, alias.filename,
+                alias.mimetype, alias.date_created, alias.content.filesize,
+                alias.restricted)
+        except LookupError:
+            raise NotFound
+
     def _eb_getFileAlias(self, failure):
         err = failure.trap(NotFound, DisconnectionError)
         if err == DisconnectionError:
@@ -177,6 +197,43 @@
             return proxy.ReverseProxyResource(self.upstreamHost,
                                               self.upstreamPort, request.path)
 
+    @defer.inlineCallbacks
+    def _cb_getFileAlias_swift(
+            self,
+            (dbcontentID, dbfilename, mimetype, date_created, size,
+                restricted),
+            filename, request
+            ):
+        # Return a 404 if the filename in the URL is incorrect. This offers
+        # a crude form of access control (stuff we care about can have
+        # unguessable names effectively using the filename as a secret).
+        if dbfilename.encode('utf-8') != filename:
+            log.msg(
+                "404: dbfilename.encode('utf-8') != filename: %r != %r"
+                % (dbfilename.encode('utf-8'), filename))
+            defer.returnValue(fourOhFour)
+
+        if not restricted:
+            # Set our caching headers. Librarian files can be cached forever.
+            request.setHeader('Cache-Control', 'max-age=31536000, public')
+        else:
+            # Restricted files require revalidation every time. For now,
+            # until the deployment details are completely reviewed, the
+            # simplest, most cautious approach is taken: no caching permited.
+            request.setHeader('Cache-Control', 'max-age=0, private')
+
+        stream = yield self.storage.open(dbcontentID)
+        if stream is not None or self.upstreamHost is None:
+            # XXX: Brad Crittenden 2007-12-05 bug=174204: When encodings are
+            # stored as part of a file's metadata this logic will be replaced.
+            encoding, mimetype = guess_librarian_encoding(filename, mimetype)
+            defer.returnValue(File_swift(
+                mimetype, encoding, date_created, stream, size))
+        else:
+            defer.returnValue(
+                proxy.ReverseProxyResource(
+                    self.upstreamHost, self.upstreamPort, request.path))
+
     def render_GET(self, request):
         return defaultResource.render(request)
 
@@ -203,6 +260,75 @@
         return self._modification_time
 
 
+class File_swift(static.File):
+    isLeaf = True
+
+    def __init__(
+        self, contentType, encoding, modification_time, stream, size):
+        # Have to convert the UTC datetime to POSIX timestamp (localtime)
+        offset = datetime.utcnow() - datetime.now()
+        local_modification_time = modification_time - offset
+        self._modification_time = time.mktime(
+            local_modification_time.timetuple())
+        static.File.__init__(self, '.')
+        self.type = contentType
+        self.encoding = encoding
+        self.stream = stream
+        self.size = size
+
+    def getModificationTime(self):
+        """Override the time on disk with the time from the database.
+
+        This is used by twisted to set the Last-Modified: header.
+        """
+        return self._modification_time
+
+    def restat(self, reraise=True):
+        return  # Noop
+
+    def getsize(self):
+        return self.size
+
+    def exists(self):
+        return self.stream is not None
+
+    def isdir(self):
+        return False
+
+    def openForReading(self):
+        return self.stream
+
+    def makeProducer(self, request, fileForReading):
+        # Unfortunately, by overriding the static.File's more
+        # complex makeProducer method we lose HTTP range support.
+        # However, this seems the only sane way of coping with the fact
+        # that sucking data in from Swift requires a Deferred and the
+        # static.*Producer implementations don't cope. This shouldn't be
+        # a problem as the Librarian sits behind Squid. If it is, I
+        # think we will need to cargo-cult three Procucer
+        # implementations in static, making the small modification to
+        # cope with self.fileObject.read maybe returning a Deferred, and
+        # the static.File.makeProducer method to return the correct
+        # producer.
+        self._setContentHeaders(request)
+        request.setResponseCode(http.OK)
+        return FileProducer(request, fileForReading)
+
+
+class FileProducer(static.NoRangeStaticProducer):
+    @defer.inlineCallbacks
+    def resumeProducing(self):
+        if not self.request:
+            return
+        data = yield self.fileObject.read(self.bufferSize)
+        if data:
+            self.request.write(data)
+        else:
+            self.request.unregisterProducer()
+            self.request.finish()
+            self.stopProducing()
+
+
 class DigestSearchResource(resource.Resource):
     def __init__(self, storage):
         self.storage = storage

=== added file 'lib/lp/services/twistedsupport/features.py'
--- lib/lp/services/twistedsupport/features.py	1970-01-01 00:00:00 +0000
+++ lib/lp/services/twistedsupport/features.py	2013-08-15 13:52:53 +0000
@@ -0,0 +1,52 @@
+# Copyright 2013 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Feature flags for Twisted.
+
+Flags are refreshed asynchronously at regular intervals.
+"""
+
+__metaclass__ = type
+__all__ = ['setup_feature_controller']
+
+from twisted.internet import (
+    defer,
+    reactor,
+    )
+from twisted.internet.threads import deferToThread
+
+from lp.services.database import read_transaction
+from lp.services.features import (
+    getFeatureFlag,
+    install_feature_controller,
+    make_script_feature_controller,
+    )
+
+
+def setup_feature_controller(script_name):
+    '''Install the FeatureController and schedule regular updates.
+
+    Update interval is specified by the twisted.flags.refresh
+    feature flag, defaulting to 30 seconds.
+    '''
+    controller = _new_controller(script_name)
+    _install_and_reschedule(controller, script_name)
+
+
+@defer.inlineCallbacks
+def update(script_name):
+    controller = yield deferToThread(_new_controller, script_name)
+    _install_and_reschedule(controller, script_name)
+
+
+@read_transaction
+def _new_controller(script_name):
+    controller = make_script_feature_controller(script_name)
+    controller.getAllFlags()  # Pull everything so future lookups don't block.
+    return controller
+
+
+def _install_and_reschedule(controller, script_name):
+    install_feature_controller(controller)
+    reactor.callLater(
+        getFeatureFlag('twisted.flags.refresh') or 30, update, script_name)

=== modified file 'lib/lp/testing/layers.py'
--- lib/lp/testing/layers.py	2013-08-15 13:52:52 +0000
+++ lib/lp/testing/layers.py	2013-06-20 05:50:00 +0000
@@ -38,7 +38,6 @@
     'LibrarianLayer',
     'PageTestLayer',
     'RabbitMQLayer',
-    'SwiftLayer',
     'TwistedAppServerLayer',
     'TwistedLaunchpadZopelessLayer',
     'TwistedLayer',
@@ -148,7 +147,6 @@
     reset_logging,
     )
 from lp.testing.pgsql import PgTestSetup
-from lp.testing.swift.fixture import SwiftFixture
 from lp.testing.smtpd import SMTPController
 
 
@@ -824,22 +822,6 @@
         return cls._db_fixture.dropDb()
 
 
-class SwiftLayer(BaseLayer):
-    @classmethod
-    @profiled
-    def setUp(cls):
-        cls.swift_fixture = SwiftFixture()
-        cls.swift_fixture.setUp()
-
-    @classmethod
-    @profiled
-    def tearDown(cls):
-        swift = cls.swift_fixture
-        if swift is not None:
-            cls.swift_fixture = None
-            swift.cleanUp()
-
-
 class LibrarianLayer(DatabaseLayer):
     """Provides tests access to a Librarian instance.
 

=== modified file 'lib/lp/testing/swift/fixture.py'
--- lib/lp/testing/swift/fixture.py	2013-08-15 13:52:52 +0000
+++ lib/lp/testing/swift/fixture.py	2013-08-15 13:52:53 +0000
@@ -12,7 +12,7 @@
 import tempfile
 import time
 
-from fixtures import FunctionFixture
+from fixtures import EnvironmentVariableFixture, FunctionFixture
 from s4 import hollow
 from swiftclient import client as swiftclient
 import testtools.content
@@ -48,13 +48,21 @@
             os.path.join(config.root, 'bin', 'py'),
             os.path.join(config.root, 'bin', 'twistd'))
 
-    def cleanUp(self):
-        if self.logfile is not None and os.path.exists(self.logfile):
-            self.addDetail(
-                'log-file', testtools.content.content_from_stream(
-                    open(self.logfile, 'r'), testtools.content_type.UTF8_TEXT))
-            os.unlink(self.logfile)
-        super(SwiftFixture, self).cleanUp()
+        logfile = self.logfile
+        self.addCleanup(lambda: os.path.exists(logfile) and os.unlink(logfile))
+
+        testtools.content.attach_file(
+            self, logfile, 'swift-log', testtools.content_type.UTF8_TEXT)
+
+        self.useFixture(EnvironmentVariableFixture(
+            'OS_AUTH_URL',
+            'http://localhost:{}/keystone/v2.0/'.format(self.daemon_port)))
+        self.useFixture(EnvironmentVariableFixture(
+            'OS_USERNAME', hollow.DEFAULT_USERNAME))
+        self.useFixture(EnvironmentVariableFixture(
+            'OS_PASSWORD', hollow.DEFAULT_PASSWORD))
+        self.useFixture(EnvironmentVariableFixture(
+            'OS_TENANT_NAME', hollow.DEFAULT_TENANT_NAME))
 
     def setUpRoot(self):
         # Create a root directory.
@@ -69,15 +77,14 @@
         os.environ['HOLLOW_ROOT'] = self.root
         os.environ['HOLLOW_PORT'] = str(self.daemon_port)
 
-    def connect(
-        self, tenant_name=hollow.DEFAULT_TENANT_NAME,
-        username=hollow.DEFAULT_USERNAME, password=hollow.DEFAULT_PASSWORD):
+    def connect(self):
         """Return a valid connection to our mock Swift"""
-        port = self.daemon_port
         client = swiftclient.Connection(
-            authurl="http://localhost:%d/keystone/v2.0/"; % port,
-            auth_version="2.0", tenant_name=tenant_name,
-            user=username, key=password,
+            authurl=os.environ.get('OS_AUTH_URL', None),
+            auth_version="2.0",
+            tenant_name=os.environ.get('OS_TENANT_NAME', None),
+            user=os.environ.get('OS_USERNAME', None),
+            key=os.environ.get('OS_PASSWORD', None),
             retries=0, insecure=True)
         return client
 
@@ -85,6 +92,6 @@
         self.setUp()
 
     def shutdown(self):
-        self.cleanUp()
+        self.killTac()
         while self._hasDaemonStarted():
             time.sleep(0.1)

=== modified file 'lib/lp/testing/swift/tests/test_fixture.py'
--- lib/lp/testing/swift/tests/test_fixture.py	2013-08-15 13:52:52 +0000
+++ lib/lp/testing/swift/tests/test_fixture.py	2013-08-15 13:52:53 +0000
@@ -7,7 +7,9 @@
 __all__ = []
 
 import httplib
+import os
 
+from s4 import hollow
 from swiftclient import client as swiftclient
 
 from lp.testing import TestCase
@@ -73,3 +75,12 @@
         client = self.swift_fixture.connect()
         headers, body = client.get_object("size", str(size))
         self.assertEquals(body, "0" * size)
+
+    def test_env(self):
+        self.assertEqual(hollow.DEFAULT_USERNAME, os.environ['OS_USERNAME'])
+        self.assertEqual(hollow.DEFAULT_PASSWORD, os.environ['OS_PASSWORD'])
+        self.assertEqual(
+            'http://localhost:{}/keystone/v2.0/'.format(
+                self.swift_fixture.daemon_port), os.environ['OS_AUTH_URL'])
+        self.assertEqual(
+            hollow.DEFAULT_TENANT_NAME, os.environ['OS_TENANT_NAME'])

=== modified file 'lib/lp_sitecustomize.py'
--- lib/lp_sitecustomize.py	2012-08-29 12:06:41 +0000
+++ lib/lp_sitecustomize.py	2013-08-15 13:52:53 +0000
@@ -10,6 +10,7 @@
 import os
 import warnings
 
+from swiftclient import client as swiftclient
 from twisted.internet.defer import (
     Deferred,
     DeferredList,
@@ -87,6 +88,17 @@
     bzr_logger.propagate = False
 
 
+def silence_swiftclient_logger():
+    """Remove debug noise generated by swiftclient and keystoneclient.
+
+    swiftclient explicitly checks if debug messages are enabled on its
+    Logger, which is unfortunate as we disable them in the handler. Not
+    only does swiftclient then emit lots of noise, but it also turns
+    keystoneclient debugging on.
+    """
+    swiftclient.logger.setLevel(logging.INFO)
+
+
 def silence_zcml_logger():
     """Lower level of ZCML parsing DEBUG messages."""
     config_filter = MappingFilter(
@@ -151,6 +163,7 @@
     silence_bzr_logger()
     silence_zcml_logger()
     silence_transaction_logger()
+    silence_swiftclient_logger()
 
 
 def customize_get_converter(zope_publisher_browser=zope.publisher.browser):

=== modified file 'setup.py' (properties changed: +x to -x)
=== modified file 'versions.cfg'
--- versions.cfg	2013-08-15 13:52:52 +0000
+++ versions.cfg	2013-08-15 13:52:53 +0000
@@ -109,7 +109,7 @@
 # to build: python setup.py sdist
 rabbitfixture = 0.3.3-lp-4
 requests = 1.2.3
-s4 = 0.1.1
+s4 = 0.1.2
 setproctitle = 1.1.7
 setuptools-git = 1.0
 simplejson = 3.1.3


Follow ups