← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~cjwatson/launchpad:parallel-feed-swift into launchpad:master


Colin Watson has proposed merging ~cjwatson/launchpad:parallel-feed-swift into launchpad:master.

Commit message:
Support running parallel instances of librarian-feed-swift

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1599001 in Launchpad itself: "librarian-feed-swift needs to upload faster (parallel) at times to keep up"

For more details, see:

This makes it easier to keep up under higher load.
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:parallel-feed-swift into launchpad:master.
diff --git a/cronscripts/librarian-feed-swift.py b/cronscripts/librarian-feed-swift.py
index de244c2..9da2c45 100755
--- a/cronscripts/librarian-feed-swift.py
+++ b/cronscripts/librarian-feed-swift.py
@@ -47,6 +47,24 @@ class LibrarianFeedSwift(LaunchpadCronScript):
             default=None, metavar="INTERVAL",
             help="Don't migrate files older than INTERVAL "
                  "(PostgreSQL syntax)")
+        self.parser.add_option(
+            "--instance-id", action="store", type=int, default=None,
+            metavar="INSTANCE_ID",
+            help=(
+                "Run as instance INSTANCE_ID (starting at 0) out of "
+                "NUM_INSTANCES parallel workers"))
+        self.parser.add_option(
+            "--num-instances", action="store", type=int, default=None,
+            metavar="NUM_INSTANCES",
+            help="Run NUM_INSTANCES parallel workers")
+    @property
+    def lockfilename(self):
+        if self.options.instance_id is not None:
+            return "launchpad-%s-%d.lock" % (
+                self.name, self.options.instance_id)
+        else:
+            return "launchpad-%s.lock" % self.name
     def main(self):
         if self.options.rename and self.options.remove:
@@ -72,17 +90,31 @@ class LibrarianFeedSwift(LaunchpadCronScript):
                     - CAST(%s AS INTERVAL)
                 """, (six.text_type(self.options.end_at),)).get_one()[0]
+        if ((self.options.instance_id is None) !=
+                (self.options.num_instances is None)):
+            self.parser.error(
+                "Must specify both or neither of --instance-id and "
+                "--num-instances")
+        kwargs = {
+            "instance_id": self.options.instance_id,
+            "num_instances": self.options.num_instances,
+            "remove_func": remove,
+            }
         if self.options.ids and (self.options.start or self.options.end):
                 "Cannot specify both individual file(s) and range")
         elif self.options.ids:
             for lfc in self.options.ids:
-                swift.to_swift(self.logger, lfc, lfc, remove)
+                swift.to_swift(
+                    self.logger, start_lfc_id=lfc, end_lfc_id=lfc, **kwargs)
-            swift.to_swift(self.logger, self.options.start,
-                           self.options.end, remove)
+            swift.to_swift(
+                self.logger, start_lfc_id=self.options.start,
+                end_lfc_id=self.options.end, **kwargs)
diff --git a/lib/lp/services/librarianserver/swift.py b/lib/lp/services/librarianserver/swift.py
index b45293f..23344c0 100644
--- a/lib/lp/services/librarianserver/swift.py
+++ b/lib/lp/services/librarianserver/swift.py
@@ -56,12 +56,17 @@ def quiet_swiftclient(func, *args, **kwargs):
         swiftclient.logger.disabled = old_disabled
-def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
+def to_swift(log, start_lfc_id=None, end_lfc_id=None,
+             instance_id=None, num_instances=None, remove_func=False):
     '''Copy a range of Librarian files from disk into Swift.
     start and end identify the range of LibraryFileContent.id to
     migrate (inclusive).
+    If instance_id and num_instances are set, only process files whose ID
+    have remainder instance_id when divided by num_instances.  This allows
+    running multiple feeders in parallel.
     If remove_func is set, it is called for every file after being copied into
@@ -76,6 +81,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
     log.info("Walking disk store {0} from {1} to {2}, inclusive".format(
         fs_root, start_lfc_id, end_lfc_id))
+    if instance_id is not None and num_instances is not None:
+        log.info("Parallel mode: instance ID {0} of {1}".format(
+            instance_id, num_instances))
     start_fs_path = filesystem_path(start_lfc_id)
     end_fs_path = filesystem_path(end_lfc_id)
@@ -134,6 +142,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
             except ValueError:
                 log.warning('Invalid hex fail, skipping {0}'.format(fs_path))
+            if instance_id is not None and num_instances is not None:
+                if (lfc % num_instances) != instance_id:
+                    continue
             log.debug('Found {0} ({1})'.format(lfc, filename))
@@ -175,6 +186,8 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
             if remove_func:
+    connection_pool.put(swift_connection)
 def rename(path):
     # It would be nice to move the file out of the tree entirely, but we
diff --git a/lib/lp/services/librarianserver/tests/test_swift.py b/lib/lp/services/librarianserver/tests/test_swift.py
index 98ebc86..b891b0d 100644
--- a/lib/lp/services/librarianserver/tests/test_swift.py
+++ b/lib/lp/services/librarianserver/tests/test_swift.py
@@ -289,6 +289,47 @@ class TestFeedSwift(TestCase):
         # Our object round tripped
         self.assertEqual(obj1 + obj2 + obj3, expected_content)
+    def test_multiple_instances(self):
+        log = BufferLogger()
+        # Confirm that files exist on disk where we expect to find them.
+        for lfc in self.lfcs:
+            path = swift.filesystem_path(lfc.id)
+            self.assertTrue(os.path.exists(path))
+        # Migrate all the files into Swift, using multiple instances.  For
+        # each instance, only the matching files are processed.
+        for instance_id in range(3):
+            swift.to_swift(
+                log, instance_id=instance_id, num_instances=3,
+                remove_func=os.unlink)
+            # Only the matching files are gone from disk.  (This is
+            # cumulative, so we test for all instances up to this point.)
+            for lfc in self.lfcs:
+                exists = os.path.exists(swift.filesystem_path(lfc.id))
+                if (lfc.id % 3) <= instance_id:
+                    self.assertFalse(exists)
+                else:
+                    self.assertTrue(exists)
+            # Confirm all the files that have been migrated so far are in
+            # Swift.
+            swift_client = self.swift_fixture.connect()
+            try:
+                for lfc, contents in zip(self.lfcs, self.contents):
+                    container, name = swift.swift_location(lfc.id)
+                    if (lfc.id % 3) <= instance_id:
+                        headers, obj = swift_client.get_object(container, name)
+                        self.assertEqual(contents, obj, 'Did not round trip')
+                    else:
+                        self.assertRaises(
+                            swiftclient.ClientException,
+                            swift.quiet_swiftclient,
+                            swift_client.get_object, container, name)
+            finally:
+                swift_client.close()
 class TestHashStream(TestCase):
     layer = BaseLayer