← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master.

Commit message:
Adding options to distribution mirror prober to allow us to easily control maximum number of requests in parallel that should be issued.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/380777

This change is basically removing the 2 hardcoded constants that control amount of parallel requests (in general and per host) and creating 2 parameters at the script.

The major change was having the RequestManager to create the locks on __init__ instead of a global one, and sharing the same RequestManager on probe_archive_mirror and updateMirrorFreshness.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master.
diff --git a/cronscripts/distributionmirror-prober.py b/cronscripts/distributionmirror-prober.py
index 6d38880..e02d3cb 100755
--- a/cronscripts/distributionmirror-prober.py
+++ b/cronscripts/distributionmirror-prober.py
@@ -37,6 +37,15 @@ class DistroMirrorProberScript(LaunchpadCronScript):
         self.parser.add_option('--max-mirrors',
             dest='max_mirrors', default=None, action='store', type="int",
             help='Only probe N mirrors.')
+        self.parser.add_option('--max-parallel',
+            dest='max_parallel', default=100,
+            action='store', type="int",
+            help='Keep maximum N parallel requests at a time (default=100).')
+        self.parser.add_option('--max-parallel-per-host',
+            dest='max_parallel_per_host', default=2,
+            action='store', type="int",
+            help='Keep maximum N parallel requests per host at a time.'
+                 ' (default=2)')
 
     def main(self):
         if self.options.content_type == 'archive':
@@ -52,7 +61,8 @@ class DistroMirrorProberScript(LaunchpadCronScript):
             lambda: config.distributionmirrorprober.timeout)
         DistroMirrorProber(self.txn, self.logger).probe(
             content_type, self.options.no_remote_hosts, self.options.force,
-            self.options.max_mirrors, not self.options.no_owner_notification)
+            self.options.max_mirrors, not self.options.no_owner_notification,
+            self.options.max_parallel, self.options.max_parallel_per_host)
 
 
 if __name__ == '__main__':
diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
index 5b5d424..a6350ca 100644
--- a/lib/lp/registry/scripts/distributionmirror_prober.py
+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
@@ -81,17 +81,6 @@ invalid_certificate_hosts = set()
 
 MAX_REDIRECTS = 3
 
-# Number of simultaneous requests we issue on a given host.
-# IMPORTANT: Don't change this unless you really know what you're doing. Using
-# a too big value can cause spurious failures on lots of mirrors and a too
-# small one can cause the prober to run for hours.
-PER_HOST_REQUESTS = 2
-
-# We limit the overall number of simultaneous requests as well to prevent
-# them from stalling and timing out before they even get a chance to
-# start connecting.
-OVERALL_REQUESTS = 100
-
 
 class LoggingMixin:
     """Common logging class for archive and releases mirror messages."""
@@ -110,12 +99,15 @@ class LoggingMixin:
 
 class RequestManager:
 
-    overall_semaphore = DeferredSemaphore(OVERALL_REQUESTS)
-
     # Yes, I want a mutable class attribute because I want changes done in an
     # instance to be visible in other instances as well.
     host_locks = {}
 
+    def __init__(self, max_parallel, max_parallel_per_host):
+        self.max_parallel = max_parallel
+        self.max_parallel_per_host = max_parallel_per_host
+        self.overall_semaphore = DeferredSemaphore(max_parallel)
+
     def run(self, host, probe_func):
         # Use a MultiLock with one semaphore limiting the overall
         # connections and another limiting the per-host connections.
@@ -123,7 +115,8 @@ class RequestManager:
             multi_lock = self.host_locks[host]
         else:
             multi_lock = MultiLock(
-                self.overall_semaphore, DeferredSemaphore(PER_HOST_REQUESTS))
+                self.overall_semaphore,
+                DeferredSemaphore(self.max_parallel_per_host))
             self.host_locks[host] = multi_lock
         return multi_lock.run(probe_func)
 
@@ -639,7 +632,7 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
         self.logMessage(msg)
         return mirror
 
-    def updateMirrorFreshness(self, arch_or_source_mirror):
+    def updateMirrorFreshness(self, arch_or_source_mirror, request_manager):
         """Update the freshness of this MirrorDistro{ArchSeries,SeriesSource}.
 
         This is done by issuing HTTP HEAD requests on that mirror looking for
@@ -664,7 +657,6 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
             self.deleteMethod(self.series, self.pocket, self.component)
             return
 
-        request_manager = RequestManager()
         deferredList = []
         # We start setting the freshness to unknown, and then we move on
         # trying to find one of the recently published packages mirrored
@@ -836,7 +828,8 @@ def checkComplete(result, key, unchecked_keys):
     return result
 
 
-def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
+def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
+                         max_parallel, max_parallel_per_host):
     """Probe an archive mirror for its contents and freshness.
 
     First we issue a set of HTTP HEAD requests on some key files to find out
@@ -850,7 +843,7 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
     packages_paths = mirror.getExpectedPackagesPaths()
     sources_paths = mirror.getExpectedSourcesPaths()
     all_paths = itertools.chain(packages_paths, sources_paths)
-    request_manager = RequestManager()
+    request_manager = RequestManager(max_parallel, max_parallel_per_host)
     for series, pocket, component, path in all_paths:
         url = urljoin(base_url, path)
         callbacks = ArchiveMirrorProberCallbacks(
@@ -864,13 +857,14 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
         deferred.addCallbacks(
             callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
 
-        deferred.addCallback(callbacks.updateMirrorFreshness)
+        deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
         deferred.addErrback(logger.error)
 
         deferred.addBoth(checkComplete, url, unchecked_keys)
 
 
-def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
+def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
+                         max_parallel, max_parallel_per_host):
     """Probe a cdimage mirror for its contents.
 
     This is done by checking the list of files for each flavour and series
@@ -900,7 +894,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
         mirror_key = (series, flavour)
         unchecked_keys.append(mirror_key)
         deferredList = []
-        request_manager = RequestManager()
+        request_manager = RequestManager(max_parallel, max_parallel_per_host)
         for path in paths:
             url = urljoin(base_url, path)
             # Use a RedirectAwareProberFactory because CD mirrors are allowed
@@ -971,9 +965,17 @@ class DistroMirrorProber:
         mirror.newProbeRecord(log_file)
 
     def probe(self, content_type, no_remote_hosts, ignore_last_probe,
-              max_mirrors, notify_owner):
+              max_mirrors, notify_owner, max_parallel=100,
+              max_parallel_per_host=2):
         """Probe distribution mirrors.
 
+        You should control carefully the parallelism here. Increasing too
+        much the number of max_parallel_per_host could make the mirrors take
+        too much to answer or deny our requests.
+
+        If we increase too much the max_parallel, we might experience timeouts
+        because of our production proxy or internet bandwidth.
+
         :param content_type: The type of mirrored content, as a
             `MirrorContent`.
         :param no_remote_hosts: If True, restrict access to localhost.
@@ -983,6 +985,10 @@ class DistroMirrorProber:
             no maximum.
         :param notify_owner: Send failure notification to the owners of the
             mirrors.
+        :param max_parallel: Maximum number of requests happening
+            simultaneously.
+        :param max_parallel_per_host: Maximum number of requests to the same
+            host happening simultaneously.
         """
         if content_type == MirrorContent.ARCHIVE:
             probe_function = probe_archive_mirror
@@ -1033,7 +1039,8 @@ class DistroMirrorProber:
             probed_mirrors.append(mirror)
             logfile = StringIO()
             logfiles[mirror_id] = logfile
-            probe_function(mirror, logfile, unchecked_keys, self.logger)
+            probe_function(mirror, logfile, unchecked_keys, self.logger,
+                           max_parallel, max_parallel_per_host)
 
         if probed_mirrors:
             reactor.run()
diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
index 82db551..c674ac0 100644
--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
@@ -56,8 +56,6 @@ from lp.registry.scripts.distributionmirror_prober import (
     MIN_REQUESTS_TO_CONSIDER_RATIO,
     MirrorCDImageProberCallbacks,
     MultiLock,
-    OVERALL_REQUESTS,
-    PER_HOST_REQUESTS,
     probe_archive_mirror,
     probe_cdimage_mirror,
     ProberFactory,
@@ -1052,7 +1050,7 @@ class TestProbeFunctionSemaphores(TestCase):
         # Note that calling this function won't actually probe any mirrors; we
         # need to call reactor.run() to actually start the probing.
         with default_timeout(15.0):
-            probe_cdimage_mirror(mirror, StringIO(), [], logging)
+            probe_cdimage_mirror(mirror, StringIO(), [], logging, 100, 2)
         self.assertEqual(0, len(mirror.cdimage_series))
 
     def test_archive_mirror_probe_function(self):
@@ -1081,43 +1079,45 @@ class TestProbeFunctionSemaphores(TestCase):
         The given probe_function must be either probe_cdimage_mirror or
         probe_archive_mirror.
         """
-        request_manager = RequestManager()
+        max_per_host_requests = 2
+        max_requests = 100
+        request_manager = RequestManager(max_requests, max_per_host_requests)
         mirror1_host = URI(mirror1.base_url).host
         mirror2_host = URI(mirror2.base_url).host
         mirror3_host = URI(mirror3.base_url).host
 
-        probe_function(mirror1, StringIO(), [], logging)
+        probe_function(mirror1, StringIO(), [], logging, 100, 2)
         # Since we have a single mirror to probe we need to have a single
-        # DeferredSemaphore with a limit of PER_HOST_REQUESTS, to ensure we
+        # DeferredSemaphore with a limit of max_per_host_requests, to ensure we
         # don't issue too many simultaneous connections on that host.
         self.assertEqual(len(request_manager.host_locks), 1)
         multi_lock = request_manager.host_locks[mirror1_host]
-        self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+        self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
         # Note that our multi_lock contains another semaphore to control the
         # overall number of requests.
-        self.assertEqual(multi_lock.overall_lock.limit, OVERALL_REQUESTS)
+        self.assertEqual(multi_lock.overall_lock.limit, max_requests)
 
-        probe_function(mirror2, StringIO(), [], logging)
+        probe_function(mirror2, StringIO(), [], logging, 100, 2)
         # Now we have two mirrors to probe, but they have the same hostname,
         # so we'll still have a single semaphore in host_semaphores.
         self.assertEqual(mirror2_host, mirror1_host)
         self.assertEqual(len(request_manager.host_locks), 1)
         multi_lock = request_manager.host_locks[mirror2_host]
-        self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+        self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
 
-        probe_function(mirror3, StringIO(), [], logging)
+        probe_function(mirror3, StringIO(), [], logging, 100, 2)
         # This third mirror is on a separate host, so we'll have a second
         # semaphore added to host_semaphores.
         self.assertTrue(mirror3_host != mirror1_host)
         self.assertEqual(len(request_manager.host_locks), 2)
         multi_lock = request_manager.host_locks[mirror3_host]
-        self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+        self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
 
         # When using an http_proxy, even though we'll actually connect to the
         # proxy, we'll use the mirror's host as the key to find the semaphore
         # that should be used
         self.pushConfig('launchpad', http_proxy='http://squid.internal:3128/')
-        probe_function(mirror3, StringIO(), [], logging)
+        probe_function(mirror3, StringIO(), [], logging, 100, 2)
         self.assertEqual(len(request_manager.host_locks), 2)