launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #25743
[Merge] ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master
Thiago F. Pappacena has proposed merging ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master.
Commit message:
Moving responses processing out of reactor on mirror prober to avoid fake HTTP timeouts
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/394494
Ideally, we should keep the reactor free to do the HTTP requests as fast as possible, and move all other processing (specially database calls) to outside the event loop. This refactoring is meant to keep track of the callback calls being made, in order to run them after the reactor finishes the job of doing the batch HTTP requests.
Unfortunately, the archive prober needs the response of a first HTTP call to do some database operations, and after that it shoots several other HTTP calls. Since the original bug report is not about the archive prober, we keep this other refactoring for the future.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master.
diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
index ca6c4d0..6d2fbf2 100644
--- a/lib/lp/registry/scripts/distributionmirror_prober.py
+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
@@ -582,6 +582,47 @@ class UnknownURLSchemeAfterRedirect(UnknownURLScheme):
"to check this kind of URL." % self.url)
+class CallScheduler:
+ """Keep track of the calls done as callback of deferred or directly,
+ so we can postpone them to after the reactor is done.
+
+ The main limitation for deferred callbacks is that we don't deal with
+ errors here. You should do error handling synchronously on the methods
+ scheduled.
+ """
+ def __init__(self, mirror, series):
+ self.mirror = mirror
+ self.series = series
+ # A list of tuples with the format:
+ # (is_a_callback, callback_result, method, args, kwargs)
+ self.calls = []
+
+ def sched(self, method, *args, **kwargs):
+ self.calls.append((False, None, method, args, kwargs))
+
+ def schedCallback(self, method, *args, **kwargs):
+ def callback(result):
+ self.calls.append((True, result, method, args, kwargs))
+ return result
+ return callback
+
+ def run(self):
+ """Runs all the delayed calls, passing forward the result from one
+ callback to the next.
+ """
+ null = object()
+ last_result = null
+ for is_callback, result, method, args, kwargs in self.calls:
+ if is_callback:
+ # If it was scheduled as a callback, take care of previous
+ # result.
+ result = result if last_result is null else last_result
+ last_result = method(result, *args, **kwargs)
+ else:
+ # If it was scheduled as a sync call, just execute the method.
+ method(*args, **kwargs)
+
+
class ArchiveMirrorProberCallbacks(LoggingMixin):
expected_failures = (
@@ -592,13 +633,15 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
InvalidHTTPSCertificateSkipped,
)
- def __init__(self, mirror, series, pocket, component, url, log_file):
+ def __init__(self, mirror, series, pocket, component, url, log_file,
+ call_sched=None):
self.mirror = mirror
self.series = series
self.pocket = pocket
self.component = component
self.url = url
self.log_file = log_file
+ self.call_sched = call_sched
if IDistroArchSeries.providedBy(series):
self.mirror_class_name = 'MirrorDistroArchSeries'
self.deleteMethod = self.mirror.deleteMirrorDistroArchSeries
@@ -661,24 +704,29 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
# this host and thus should skip it, so it's better to delete this
# MirrorDistroArchSeries/MirrorDistroSeriesSource than to keep
# it with an UNKNOWN freshness.
- self.deleteMethod(self.series, self.pocket, self.component)
+ self.call_sched.sched(
+ self.deleteMethod, self.series, self.pocket, self.component)
return
deferredList = []
# We start setting the freshness to unknown, and then we move on
# trying to find one of the recently published packages mirrored
# there.
- arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
+ self.call_sched.sched(
+ self.setMirrorFreshnessUnknown, arch_or_source_mirror)
for freshness, url in freshness_url_map.items():
prober = RedirectAwareProberFactory(url)
deferred = request_manager.run(prober.request_host, prober.probe)
- deferred.addCallback(
- self.setMirrorFreshness, arch_or_source_mirror, freshness,
- url)
deferred.addErrback(self.logError, url)
+ deferred.addCallback(self.call_sched.schedCallback(
+ self.setMirrorFreshness, arch_or_source_mirror,
+ freshness, url))
deferredList.append(deferred)
return defer.DeferredList(deferredList)
+ def setMirrorFreshnessUnknown(self, arch_or_source_mirror):
+ arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
+
def setMirrorFreshness(
self, http_status, arch_or_source_mirror, freshness, url):
"""Update the freshness of the given arch or source mirror.
@@ -688,9 +736,11 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
"""
if freshness < arch_or_source_mirror.freshness:
msg = ('Found that %s exists. Updating %s of %s freshness to '
- '%s.\n' % (url, self.mirror_class_name,
- self._getSeriesPocketAndComponentDescription(),
- freshness.title))
+ '%s.\n')
+ msg = msg % (
+ url, self.mirror_class_name,
+ self._getSeriesPocketAndComponentDescription(),
+ freshness.title)
self.logMessage(msg)
arch_or_source_mirror.freshness = freshness
@@ -779,6 +829,15 @@ class MirrorCDImageProberCallbacks(LoggingMixin):
"Failed %s: %s\n" % (url, failure.getErrorMessage()))
return failure
+ def urlCallback(self, result, url):
+ """The callback to be called for each URL."""
+ if isinstance(result, Failure):
+ self.logMissingURL(result, url)
+
+ def finalResultCallback(self, result):
+ """The callback to be called once all URLs have been probed."""
+ return self.ensureOrDeleteMirrorCDImageSeries(result)
+
def _get_cdimage_file_list():
url = config.distributionmirrorprober.cdimage_file_list_url
@@ -851,23 +910,34 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
sources_paths = mirror.getExpectedSourcesPaths()
all_paths = itertools.chain(packages_paths, sources_paths)
request_manager = RequestManager(max_parallel, max_parallel_per_host)
+
+ call_scheds = []
for series, pocket, component, path in all_paths:
+ sched = CallScheduler(mirror, series)
+ call_scheds.append(sched)
url = urljoin(base_url, path)
callbacks = ArchiveMirrorProberCallbacks(
- mirror, series, pocket, component, url, logfile)
+ mirror, series, pocket, component, url, logfile, sched)
unchecked_keys.append(url)
# APT has supported redirects since 0.7.21 (2009-04-14), so allow
# them here too.
prober = RedirectAwareProberFactory(url)
deferred = request_manager.run(prober.request_host, prober.probe)
+
+ # XXX pappacena 2020-11-25: This will do some database operation
+ # inside reactor, which might cause problems like timeouts when
+ # running HTTP requests. This should be the next optimization point:
+ # run {ensure|delete}MirrorSeries and gather all mirror freshness URLs
+ # synchronously here, and ask reactor to run just the HTTP requests.
deferred.addCallbacks(
callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
-
- deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
+ deferred.addCallback(
+ callbacks.updateMirrorFreshness, request_manager)
deferred.addErrback(logger.error)
deferred.addBoth(checkComplete, url, unchecked_keys)
+ return call_scheds
def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
@@ -894,6 +964,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
logger.error(e)
return
+ call_scheds = []
for series, flavour, paths in cdimage_paths:
callbacks = MirrorCDImageProberCallbacks(
mirror, series, flavour, logfile)
@@ -906,14 +977,21 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
url = urljoin(base_url, path)
# Use a RedirectAwareProberFactory because CD mirrors are allowed
# to redirect, and we need to cope with that.
+ sched = CallScheduler(mirror, series)
+ call_scheds.append(sched)
prober = RedirectAwareProberFactory(url)
deferred = request_manager.run(prober.request_host, prober.probe)
- deferred.addErrback(callbacks.logMissingURL, url)
+ deferred.addErrback(
+ sched.schedCallback(callbacks.urlCallback, url))
deferredList.append(deferred)
+ sched = CallScheduler(mirror, series)
+ call_scheds.append(sched)
deferredList = defer.DeferredList(deferredList, consumeErrors=True)
- deferredList.addCallback(callbacks.ensureOrDeleteMirrorCDImageSeries)
+ deferredList.addCallback(
+ sched.schedCallback(callbacks.finalResultCallback))
deferredList.addCallback(checkComplete, mirror_key, unchecked_keys)
+ return call_scheds
def should_skip_host(host):
@@ -1029,6 +1107,7 @@ class DistroMirrorProber:
logfiles = {}
probed_mirrors = []
+ all_scheduled_calls = []
for mirror_id in mirror_ids:
mirror = mirror_set[mirror_id]
if not self._sanity_check_mirror(mirror):
@@ -1047,12 +1126,18 @@ class DistroMirrorProber:
probed_mirrors.append(mirror)
logfile = six.StringIO()
logfiles[mirror_id] = logfile
- probe_function(mirror, logfile, unchecked_keys, self.logger,
- max_parallel, max_parallel_per_host)
+ prob_scheduled_calls = probe_function(
+ mirror, logfile, unchecked_keys, self.logger,
+ max_parallel, max_parallel_per_host)
+ all_scheduled_calls += prob_scheduled_calls
if probed_mirrors:
reactor.run()
self.logger.info('Probed %d mirrors.' % len(probed_mirrors))
+ self.logger.info(
+ 'Starting to update mirrors statuses outside reactor now.')
+ for sched_calls in all_scheduled_calls:
+ sched_calls.run()
else:
self.logger.info('No mirrors to probe.')
diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
index 4e6abab..ee7780f 100644
--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
@@ -8,6 +8,8 @@ __metaclass__ = type
from datetime import datetime
import logging
import os
+import re
+from textwrap import dedent
from fixtures import MockPatchObject
from lazr.uri import URI
@@ -25,6 +27,7 @@ from testtools.twistedsupport import (
AsynchronousDeferredRunTest,
AsynchronousDeferredRunTestForBrokenTwisted,
)
+import transaction
from twisted.internet import (
defer,
reactor,
@@ -37,8 +40,18 @@ from zope.component import getUtility
from zope.security.proxy import removeSecurityProxy
from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.registry.interfaces.distributionmirror import (
+ MirrorContent,
+ MirrorStatus,
+ )
from lp.registry.interfaces.pocket import PackagePublishingPocket
-from lp.registry.model.distributionmirror import DistributionMirror
+from lp.registry.model.distributionmirror import (
+ DistributionMirror,
+ MirrorCDImageDistroSeries,
+ MirrorDistroArchSeries,
+ MirrorDistroSeriesSource,
+ MirrorProbeRecord,
+ )
from lp.registry.scripts import distributionmirror_prober
from lp.registry.scripts.distributionmirror_prober import (
_get_cdimage_file_list,
@@ -72,14 +85,18 @@ from lp.registry.tests.distributionmirror_http_server import (
)
from lp.services.config import config
from lp.services.daemons.tachandler import TacTestSetup
+from lp.services.database.interfaces import IStore
from lp.services.httpproxy.connect_tunneling import TunnelingAgent
from lp.services.timeout import default_timeout
from lp.testing import (
+ admin_logged_in,
clean_up_reactor,
+ run_script,
TestCase,
TestCaseWithFactory,
)
from lp.testing.layers import (
+ LaunchpadZopelessLayer,
TwistedLayer,
ZopelessDatabaseLayer,
)
@@ -1174,3 +1191,117 @@ class TestLoggingMixin(TestCase):
logger.log_file.seek(0)
message = logger.log_file.read()
self.assertNotEqual(None, message)
+
+
+class TestDistroMirrorProberFunctional(TestCaseWithFactory):
+
+ layer = LaunchpadZopelessLayer
+
+ def setUp(self):
+ super(TestDistroMirrorProberFunctional, self).setUp()
+ # Makes a clean distro mirror set, with only the mirrors we want.
+ self.removeMirrors()
+
+ def removeMirrors(self):
+ """Removes all mirror information from database."""
+ store = IStore(DistributionMirror)
+ store.find(MirrorProbeRecord).remove()
+ store.find(MirrorDistroArchSeries).remove()
+ store.find(MirrorDistroSeriesSource).remove()
+ store.find(MirrorCDImageDistroSeries).remove()
+ store.find(DistributionMirror).remove()
+ store.flush()
+
+ def makeMirror(self, content_type, distro=None):
+ with admin_logged_in():
+ if distro is None:
+ distro = self.factory.makeDistribution()
+ distro.supports_mirrors = True
+ self.factory.makeDistroSeries(distribution=distro)
+ mirror = self.factory.makeMirror(
+ distro, http_url="http://fake-url.invalid")
+ mirror.enabled = True
+ mirror.status = MirrorStatus.OFFICIAL
+ mirror.official_candidate = True
+ mirror.content = content_type
+ return mirror
+
+ def test_cdimage_prober(self):
+ """Checks that CD image prober works fine, end to end."""
+ mirror = self.makeMirror(content_type=MirrorContent.RELEASE)
+ transaction.commit()
+
+ out, err, exit_code = run_script(
+ "cronscripts/distributionmirror-prober.py --no-remote-hosts "
+ "--content-type=cdimage --no-owner-notification --force")
+ self.assertEqual(0, exit_code, err)
+
+ lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
+ self.assertEqual(dedent("""\
+ INFO Creating lockfile: %s
+ INFO Probing CD Image Mirrors
+ INFO Probed 1 mirrors.
+ INFO Starting to update mirrors statuses outside reactor now.
+ INFO Done.
+ """) % lock_file, err)
+
+ with admin_logged_in():
+ record = removeSecurityProxy(mirror.last_probe_record)
+
+ log_lines = record.log_file.read()
+ self.assertEqual(4, len(log_lines.split("\n")))
+ self.assertIn(
+ "Found all ISO images for series The Hoary Hedgehog Release "
+ "and flavour kubuntu.", log_lines)
+ self.assertIn(
+ "Found all ISO images for series The Hoary Hedgehog Release "
+ "and flavour ubuntu.", log_lines)
+ self.assertIn(
+ "Found all ISO images for series The Warty Warthog Release "
+ "and flavour ubuntu.", log_lines)
+
+ def test_archive_prober(self):
+ """Checks that archive prober works fine, end to end."""
+ # Using ubuntu to avoid the need to create all the packages that
+ # will be checked by prober.
+ ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
+ mirror = self.makeMirror(
+ content_type=MirrorContent.ARCHIVE, distro=ubuntu)
+ transaction.commit()
+
+ out, err, exit_code = run_script(
+ "cronscripts/distributionmirror-prober.py --no-remote-hosts "
+ "--content-type=archive --no-owner-notification --force")
+ self.assertEqual(0, exit_code, err)
+
+ lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
+ self.assertEqual(dedent("""\
+ INFO Creating lockfile: %s
+ INFO Probing Archive Mirrors
+ INFO Probed 1 mirrors.
+ INFO Starting to update mirrors statuses outside reactor now.
+ INFO Done.
+ """) % lock_file, err)
+
+ with admin_logged_in():
+ record = removeSecurityProxy(mirror.last_probe_record)
+
+ log_lines = record.log_file.read()
+
+ # Make sure that prober output seems reasonable.
+ self.assertEqual(85, len(log_lines.split("\n")))
+ url = "http://fake-url.invalid/dists/"
+ self.assertEqual(40, len(re.findall(
+ (r"Ensuring MirrorDistroSeries of .* with url %s" % url) +
+ r".* exists in the database",
+ log_lines)))
+ self.assertEqual(40, len(re.findall(
+ (r"Ensuring MirrorDistroArchSeries of .* with url %s" % url) +
+ r".* exists in the database",
+ log_lines)))
+ self.assertEqual(1, len(re.findall(
+ r"Updating MirrorDistroArchSeries of .* freshness to Up to date",
+ log_lines)))
+ self.assertEqual(3, len(re.findall(
+ r"Updating MirrorDistroSeries of .* freshness to Up to date",
+ log_lines)))
Follow ups