launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #26890
[Merge] ~cjwatson/launchpad:parallel-feed-swift into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:parallel-feed-swift into launchpad:master.
Commit message:
Support running parallel instances of librarian-feed-swift
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1599001 in Launchpad itself: "librarian-feed-swift needs to upload faster (parallel) at times to keep up"
https://bugs.launchpad.net/launchpad/+bug/1599001
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/401127
This makes it easier to keep up under higher load.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:parallel-feed-swift into launchpad:master.
diff --git a/cronscripts/librarian-feed-swift.py b/cronscripts/librarian-feed-swift.py
index de244c2..9da2c45 100755
--- a/cronscripts/librarian-feed-swift.py
+++ b/cronscripts/librarian-feed-swift.py
@@ -47,6 +47,24 @@ class LibrarianFeedSwift(LaunchpadCronScript):
default=None, metavar="INTERVAL",
help="Don't migrate files older than INTERVAL "
"(PostgreSQL syntax)")
+ self.parser.add_option(
+ "--instance-id", action="store", type=int, default=None,
+ metavar="INSTANCE_ID",
+ help=(
+ "Run as instance INSTANCE_ID (starting at 0) out of "
+ "NUM_INSTANCES parallel workers"))
+ self.parser.add_option(
+ "--num-instances", action="store", type=int, default=None,
+ metavar="NUM_INSTANCES",
+ help="Run NUM_INSTANCES parallel workers")
+
+ @property
+ def lockfilename(self):
+ if self.options.instance_id is not None:
+ return "launchpad-%s-%d.lock" % (
+ self.name, self.options.instance_id)
+ else:
+ return "launchpad-%s.lock" % self.name
def main(self):
if self.options.rename and self.options.remove:
@@ -72,17 +90,31 @@ class LibrarianFeedSwift(LaunchpadCronScript):
- CAST(%s AS INTERVAL)
""", (six.text_type(self.options.end_at),)).get_one()[0]
+ if ((self.options.instance_id is None) !=
+ (self.options.num_instances is None)):
+ self.parser.error(
+ "Must specify both or neither of --instance-id and "
+ "--num-instances")
+
+ kwargs = {
+ "instance_id": self.options.instance_id,
+ "num_instances": self.options.num_instances,
+ "remove_func": remove,
+ }
+
if self.options.ids and (self.options.start or self.options.end):
self.parser.error(
"Cannot specify both individual file(s) and range")
elif self.options.ids:
for lfc in self.options.ids:
- swift.to_swift(self.logger, lfc, lfc, remove)
+ swift.to_swift(
+ self.logger, start_lfc_id=lfc, end_lfc_id=lfc, **kwargs)
else:
- swift.to_swift(self.logger, self.options.start,
- self.options.end, remove)
+ swift.to_swift(
+ self.logger, start_lfc_id=self.options.start,
+ end_lfc_id=self.options.end, **kwargs)
self.logger.info('Done')
diff --git a/lib/lp/services/librarianserver/swift.py b/lib/lp/services/librarianserver/swift.py
index b45293f..23344c0 100644
--- a/lib/lp/services/librarianserver/swift.py
+++ b/lib/lp/services/librarianserver/swift.py
@@ -56,12 +56,17 @@ def quiet_swiftclient(func, *args, **kwargs):
swiftclient.logger.disabled = old_disabled
-def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
+def to_swift(log, start_lfc_id=None, end_lfc_id=None,
+ instance_id=None, num_instances=None, remove_func=False):
'''Copy a range of Librarian files from disk into Swift.
start and end identify the range of LibraryFileContent.id to
migrate (inclusive).
+ If instance_id and num_instances are set, only process files whose ID
+ have remainder instance_id when divided by num_instances. This allows
+ running multiple feeders in parallel.
+
If remove_func is set, it is called for every file after being copied into
Swift.
'''
@@ -76,6 +81,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
log.info("Walking disk store {0} from {1} to {2}, inclusive".format(
fs_root, start_lfc_id, end_lfc_id))
+ if instance_id is not None and num_instances is not None:
+ log.info("Parallel mode: instance ID {0} of {1}".format(
+ instance_id, num_instances))
start_fs_path = filesystem_path(start_lfc_id)
end_fs_path = filesystem_path(end_lfc_id)
@@ -134,6 +142,9 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
except ValueError:
log.warning('Invalid hex fail, skipping {0}'.format(fs_path))
continue
+ if instance_id is not None and num_instances is not None:
+ if (lfc % num_instances) != instance_id:
+ continue
log.debug('Found {0} ({1})'.format(lfc, filename))
@@ -175,6 +186,8 @@ def to_swift(log, start_lfc_id=None, end_lfc_id=None, remove_func=False):
if remove_func:
remove_func(fs_path)
+ connection_pool.put(swift_connection)
+
def rename(path):
# It would be nice to move the file out of the tree entirely, but we
diff --git a/lib/lp/services/librarianserver/tests/test_swift.py b/lib/lp/services/librarianserver/tests/test_swift.py
index 98ebc86..b891b0d 100644
--- a/lib/lp/services/librarianserver/tests/test_swift.py
+++ b/lib/lp/services/librarianserver/tests/test_swift.py
@@ -289,6 +289,47 @@ class TestFeedSwift(TestCase):
# Our object round tripped
self.assertEqual(obj1 + obj2 + obj3, expected_content)
+ def test_multiple_instances(self):
+ log = BufferLogger()
+
+ # Confirm that files exist on disk where we expect to find them.
+ for lfc in self.lfcs:
+ path = swift.filesystem_path(lfc.id)
+ self.assertTrue(os.path.exists(path))
+
+ # Migrate all the files into Swift, using multiple instances. For
+ # each instance, only the matching files are processed.
+ for instance_id in range(3):
+ swift.to_swift(
+ log, instance_id=instance_id, num_instances=3,
+ remove_func=os.unlink)
+
+ # Only the matching files are gone from disk. (This is
+ # cumulative, so we test for all instances up to this point.)
+ for lfc in self.lfcs:
+ exists = os.path.exists(swift.filesystem_path(lfc.id))
+ if (lfc.id % 3) <= instance_id:
+ self.assertFalse(exists)
+ else:
+ self.assertTrue(exists)
+
+ # Confirm all the files that have been migrated so far are in
+ # Swift.
+ swift_client = self.swift_fixture.connect()
+ try:
+ for lfc, contents in zip(self.lfcs, self.contents):
+ container, name = swift.swift_location(lfc.id)
+ if (lfc.id % 3) <= instance_id:
+ headers, obj = swift_client.get_object(container, name)
+ self.assertEqual(contents, obj, 'Did not round trip')
+ else:
+ self.assertRaises(
+ swiftclient.ClientException,
+ swift.quiet_swiftclient,
+ swift_client.get_object, container, name)
+ finally:
+ swift_client.close()
+
class TestHashStream(TestCase):
layer = BaseLayer