launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24477
[Merge] ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master.
Commit message:
Adding options to distribution mirror prober to allow us to easily control maximum number of requests in parallel that should be issued.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/380777
This change is basically removing the 2 hardcoded constants that control amount of parallel requests (in general and per host) and creating 2 parameters at the script.
The major change was having the RequestManager to create the locks on __init__ instead of a global one, and sharing the same RequestManager on probe_archive_mirror and updateMirrorFreshness.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:mirror-prober-parallelism-control into launchpad:master.
diff --git a/cronscripts/distributionmirror-prober.py b/cronscripts/distributionmirror-prober.py
index 6d38880..e02d3cb 100755
--- a/cronscripts/distributionmirror-prober.py
+++ b/cronscripts/distributionmirror-prober.py
@@ -37,6 +37,15 @@ class DistroMirrorProberScript(LaunchpadCronScript):
self.parser.add_option('--max-mirrors',
dest='max_mirrors', default=None, action='store', type="int",
help='Only probe N mirrors.')
+ self.parser.add_option('--max-parallel',
+ dest='max_parallel', default=100,
+ action='store', type="int",
+ help='Keep maximum N parallel requests at a time (default=100).')
+ self.parser.add_option('--max-parallel-per-host',
+ dest='max_parallel_per_host', default=2,
+ action='store', type="int",
+ help='Keep maximum N parallel requests per host at a time.'
+ ' (default=2)')
def main(self):
if self.options.content_type == 'archive':
@@ -52,7 +61,8 @@ class DistroMirrorProberScript(LaunchpadCronScript):
lambda: config.distributionmirrorprober.timeout)
DistroMirrorProber(self.txn, self.logger).probe(
content_type, self.options.no_remote_hosts, self.options.force,
- self.options.max_mirrors, not self.options.no_owner_notification)
+ self.options.max_mirrors, not self.options.no_owner_notification,
+ self.options.max_parallel, self.options.max_parallel_per_host)
if __name__ == '__main__':
diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
index 5b5d424..a6350ca 100644
--- a/lib/lp/registry/scripts/distributionmirror_prober.py
+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
@@ -81,17 +81,6 @@ invalid_certificate_hosts = set()
MAX_REDIRECTS = 3
-# Number of simultaneous requests we issue on a given host.
-# IMPORTANT: Don't change this unless you really know what you're doing. Using
-# a too big value can cause spurious failures on lots of mirrors and a too
-# small one can cause the prober to run for hours.
-PER_HOST_REQUESTS = 2
-
-# We limit the overall number of simultaneous requests as well to prevent
-# them from stalling and timing out before they even get a chance to
-# start connecting.
-OVERALL_REQUESTS = 100
-
class LoggingMixin:
"""Common logging class for archive and releases mirror messages."""
@@ -110,12 +99,15 @@ class LoggingMixin:
class RequestManager:
- overall_semaphore = DeferredSemaphore(OVERALL_REQUESTS)
-
# Yes, I want a mutable class attribute because I want changes done in an
# instance to be visible in other instances as well.
host_locks = {}
+ def __init__(self, max_parallel, max_parallel_per_host):
+ self.max_parallel = max_parallel
+ self.max_parallel_per_host = max_parallel_per_host
+ self.overall_semaphore = DeferredSemaphore(max_parallel)
+
def run(self, host, probe_func):
# Use a MultiLock with one semaphore limiting the overall
# connections and another limiting the per-host connections.
@@ -123,7 +115,8 @@ class RequestManager:
multi_lock = self.host_locks[host]
else:
multi_lock = MultiLock(
- self.overall_semaphore, DeferredSemaphore(PER_HOST_REQUESTS))
+ self.overall_semaphore,
+ DeferredSemaphore(self.max_parallel_per_host))
self.host_locks[host] = multi_lock
return multi_lock.run(probe_func)
@@ -639,7 +632,7 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
self.logMessage(msg)
return mirror
- def updateMirrorFreshness(self, arch_or_source_mirror):
+ def updateMirrorFreshness(self, arch_or_source_mirror, request_manager):
"""Update the freshness of this MirrorDistro{ArchSeries,SeriesSource}.
This is done by issuing HTTP HEAD requests on that mirror looking for
@@ -664,7 +657,6 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
self.deleteMethod(self.series, self.pocket, self.component)
return
- request_manager = RequestManager()
deferredList = []
# We start setting the freshness to unknown, and then we move on
# trying to find one of the recently published packages mirrored
@@ -836,7 +828,8 @@ def checkComplete(result, key, unchecked_keys):
return result
-def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
+def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
+ max_parallel, max_parallel_per_host):
"""Probe an archive mirror for its contents and freshness.
First we issue a set of HTTP HEAD requests on some key files to find out
@@ -850,7 +843,7 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
packages_paths = mirror.getExpectedPackagesPaths()
sources_paths = mirror.getExpectedSourcesPaths()
all_paths = itertools.chain(packages_paths, sources_paths)
- request_manager = RequestManager()
+ request_manager = RequestManager(max_parallel, max_parallel_per_host)
for series, pocket, component, path in all_paths:
url = urljoin(base_url, path)
callbacks = ArchiveMirrorProberCallbacks(
@@ -864,13 +857,14 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger):
deferred.addCallbacks(
callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
- deferred.addCallback(callbacks.updateMirrorFreshness)
+ deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
deferred.addErrback(logger.error)
deferred.addBoth(checkComplete, url, unchecked_keys)
-def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
+def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
+ max_parallel, max_parallel_per_host):
"""Probe a cdimage mirror for its contents.
This is done by checking the list of files for each flavour and series
@@ -900,7 +894,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger):
mirror_key = (series, flavour)
unchecked_keys.append(mirror_key)
deferredList = []
- request_manager = RequestManager()
+ request_manager = RequestManager(max_parallel, max_parallel_per_host)
for path in paths:
url = urljoin(base_url, path)
# Use a RedirectAwareProberFactory because CD mirrors are allowed
@@ -971,9 +965,17 @@ class DistroMirrorProber:
mirror.newProbeRecord(log_file)
def probe(self, content_type, no_remote_hosts, ignore_last_probe,
- max_mirrors, notify_owner):
+ max_mirrors, notify_owner, max_parallel=100,
+ max_parallel_per_host=2):
"""Probe distribution mirrors.
+ You should control carefully the parallelism here. Increasing too
+ much the number of max_parallel_per_host could make the mirrors take
+ too much to answer or deny our requests.
+
+ If we increase too much the max_parallel, we might experience timeouts
+ because of our production proxy or internet bandwidth.
+
:param content_type: The type of mirrored content, as a
`MirrorContent`.
:param no_remote_hosts: If True, restrict access to localhost.
@@ -983,6 +985,10 @@ class DistroMirrorProber:
no maximum.
:param notify_owner: Send failure notification to the owners of the
mirrors.
+ :param max_parallel: Maximum number of requests happening
+ simultaneously.
+ :param max_parallel_per_host: Maximum number of requests to the same
+ host happening simultaneously.
"""
if content_type == MirrorContent.ARCHIVE:
probe_function = probe_archive_mirror
@@ -1033,7 +1039,8 @@ class DistroMirrorProber:
probed_mirrors.append(mirror)
logfile = StringIO()
logfiles[mirror_id] = logfile
- probe_function(mirror, logfile, unchecked_keys, self.logger)
+ probe_function(mirror, logfile, unchecked_keys, self.logger,
+ max_parallel, max_parallel_per_host)
if probed_mirrors:
reactor.run()
diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
index 82db551..c674ac0 100644
--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
@@ -56,8 +56,6 @@ from lp.registry.scripts.distributionmirror_prober import (
MIN_REQUESTS_TO_CONSIDER_RATIO,
MirrorCDImageProberCallbacks,
MultiLock,
- OVERALL_REQUESTS,
- PER_HOST_REQUESTS,
probe_archive_mirror,
probe_cdimage_mirror,
ProberFactory,
@@ -1052,7 +1050,7 @@ class TestProbeFunctionSemaphores(TestCase):
# Note that calling this function won't actually probe any mirrors; we
# need to call reactor.run() to actually start the probing.
with default_timeout(15.0):
- probe_cdimage_mirror(mirror, StringIO(), [], logging)
+ probe_cdimage_mirror(mirror, StringIO(), [], logging, 100, 2)
self.assertEqual(0, len(mirror.cdimage_series))
def test_archive_mirror_probe_function(self):
@@ -1081,43 +1079,45 @@ class TestProbeFunctionSemaphores(TestCase):
The given probe_function must be either probe_cdimage_mirror or
probe_archive_mirror.
"""
- request_manager = RequestManager()
+ max_per_host_requests = 2
+ max_requests = 100
+ request_manager = RequestManager(max_requests, max_per_host_requests)
mirror1_host = URI(mirror1.base_url).host
mirror2_host = URI(mirror2.base_url).host
mirror3_host = URI(mirror3.base_url).host
- probe_function(mirror1, StringIO(), [], logging)
+ probe_function(mirror1, StringIO(), [], logging, 100, 2)
# Since we have a single mirror to probe we need to have a single
- # DeferredSemaphore with a limit of PER_HOST_REQUESTS, to ensure we
+ # DeferredSemaphore with a limit of max_per_host_requests, to ensure we
# don't issue too many simultaneous connections on that host.
self.assertEqual(len(request_manager.host_locks), 1)
multi_lock = request_manager.host_locks[mirror1_host]
- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
# Note that our multi_lock contains another semaphore to control the
# overall number of requests.
- self.assertEqual(multi_lock.overall_lock.limit, OVERALL_REQUESTS)
+ self.assertEqual(multi_lock.overall_lock.limit, max_requests)
- probe_function(mirror2, StringIO(), [], logging)
+ probe_function(mirror2, StringIO(), [], logging, 100, 2)
# Now we have two mirrors to probe, but they have the same hostname,
# so we'll still have a single semaphore in host_semaphores.
self.assertEqual(mirror2_host, mirror1_host)
self.assertEqual(len(request_manager.host_locks), 1)
multi_lock = request_manager.host_locks[mirror2_host]
- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
- probe_function(mirror3, StringIO(), [], logging)
+ probe_function(mirror3, StringIO(), [], logging, 100, 2)
# This third mirror is on a separate host, so we'll have a second
# semaphore added to host_semaphores.
self.assertTrue(mirror3_host != mirror1_host)
self.assertEqual(len(request_manager.host_locks), 2)
multi_lock = request_manager.host_locks[mirror3_host]
- self.assertEqual(multi_lock.host_lock.limit, PER_HOST_REQUESTS)
+ self.assertEqual(multi_lock.host_lock.limit, max_per_host_requests)
# When using an http_proxy, even though we'll actually connect to the
# proxy, we'll use the mirror's host as the key to find the semaphore
# that should be used
self.pushConfig('launchpad', http_proxy='http://squid.internal:3128/')
- probe_function(mirror3, StringIO(), [], logging)
+ probe_function(mirror3, StringIO(), [], logging, 100, 2)
self.assertEqual(len(request_manager.host_locks), 2)