← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master

 

Thiago F. Pappacena has proposed merging ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master.

Commit message:
Moving responses processing out of reactor on mirror prober to avoid fake HTTP timeouts

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/launchpad/+git/launchpad/+merge/394494

Ideally, we should keep the reactor free to do the HTTP requests as fast as possible, and move all other processing (specially database calls) to outside the event loop. This refactoring is meant to keep track of the callback calls being made, in order to run them after the reactor finishes the job of doing the batch HTTP requests.

Unfortunately, the archive prober needs the response of a first HTTP call to do some database operations, and after that it shoots several other HTTP calls. Since the original bug report is not about the archive prober, we keep this other refactoring for the future.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/launchpad:mirror-prober-untwist-db-calls into launchpad:master.
diff --git a/lib/lp/registry/scripts/distributionmirror_prober.py b/lib/lp/registry/scripts/distributionmirror_prober.py
index ca6c4d0..6d2fbf2 100644
--- a/lib/lp/registry/scripts/distributionmirror_prober.py
+++ b/lib/lp/registry/scripts/distributionmirror_prober.py
@@ -582,6 +582,47 @@ class UnknownURLSchemeAfterRedirect(UnknownURLScheme):
                 "to check this kind of URL." % self.url)
 
 
+class CallScheduler:
+    """Keep track of the calls done as callback of deferred or directly,
+    so we can postpone them to after the reactor is done.
+
+    The main limitation for deferred callbacks is that we don't deal with
+    errors here. You should do error handling synchronously on the methods
+    scheduled.
+    """
+    def __init__(self, mirror, series):
+        self.mirror = mirror
+        self.series = series
+        # A list of tuples with the format:
+        # (is_a_callback, callback_result, method, args, kwargs)
+        self.calls = []
+
+    def sched(self, method, *args, **kwargs):
+        self.calls.append((False, None, method, args, kwargs))
+
+    def schedCallback(self, method, *args, **kwargs):
+        def callback(result):
+            self.calls.append((True, result, method, args, kwargs))
+            return result
+        return callback
+
+    def run(self):
+        """Runs all the delayed calls, passing forward the result from one
+        callback to the next.
+        """
+        null = object()
+        last_result = null
+        for is_callback, result, method, args, kwargs in self.calls:
+            if is_callback:
+                # If it was scheduled as a callback, take care of previous
+                # result.
+                result = result if last_result is null else last_result
+                last_result = method(result, *args, **kwargs)
+            else:
+                # If it was scheduled as a sync call, just execute the method.
+                method(*args, **kwargs)
+
+
 class ArchiveMirrorProberCallbacks(LoggingMixin):
 
     expected_failures = (
@@ -592,13 +633,15 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
         InvalidHTTPSCertificateSkipped,
         )
 
-    def __init__(self, mirror, series, pocket, component, url, log_file):
+    def __init__(self, mirror, series, pocket, component, url, log_file,
+                 call_sched=None):
         self.mirror = mirror
         self.series = series
         self.pocket = pocket
         self.component = component
         self.url = url
         self.log_file = log_file
+        self.call_sched = call_sched
         if IDistroArchSeries.providedBy(series):
             self.mirror_class_name = 'MirrorDistroArchSeries'
             self.deleteMethod = self.mirror.deleteMirrorDistroArchSeries
@@ -661,24 +704,29 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
             # this host and thus should skip it, so it's better to delete this
             # MirrorDistroArchSeries/MirrorDistroSeriesSource than to keep
             # it with an UNKNOWN freshness.
-            self.deleteMethod(self.series, self.pocket, self.component)
+            self.call_sched.sched(
+                self.deleteMethod, self.series, self.pocket, self.component)
             return
 
         deferredList = []
         # We start setting the freshness to unknown, and then we move on
         # trying to find one of the recently published packages mirrored
         # there.
-        arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
+        self.call_sched.sched(
+            self.setMirrorFreshnessUnknown, arch_or_source_mirror)
         for freshness, url in freshness_url_map.items():
             prober = RedirectAwareProberFactory(url)
             deferred = request_manager.run(prober.request_host, prober.probe)
-            deferred.addCallback(
-                self.setMirrorFreshness, arch_or_source_mirror, freshness,
-                url)
             deferred.addErrback(self.logError, url)
+            deferred.addCallback(self.call_sched.schedCallback(
+                self.setMirrorFreshness, arch_or_source_mirror,
+                freshness, url))
             deferredList.append(deferred)
         return defer.DeferredList(deferredList)
 
+    def setMirrorFreshnessUnknown(self, arch_or_source_mirror):
+        arch_or_source_mirror.freshness = MirrorFreshness.UNKNOWN
+
     def setMirrorFreshness(
             self, http_status, arch_or_source_mirror, freshness, url):
         """Update the freshness of the given arch or source mirror.
@@ -688,9 +736,11 @@ class ArchiveMirrorProberCallbacks(LoggingMixin):
         """
         if freshness < arch_or_source_mirror.freshness:
             msg = ('Found that %s exists. Updating %s of %s freshness to '
-                   '%s.\n' % (url, self.mirror_class_name,
-                              self._getSeriesPocketAndComponentDescription(),
-                              freshness.title))
+                   '%s.\n')
+            msg = msg % (
+                url, self.mirror_class_name,
+                self._getSeriesPocketAndComponentDescription(),
+                freshness.title)
             self.logMessage(msg)
             arch_or_source_mirror.freshness = freshness
 
@@ -779,6 +829,15 @@ class MirrorCDImageProberCallbacks(LoggingMixin):
             "Failed %s: %s\n" % (url, failure.getErrorMessage()))
         return failure
 
+    def urlCallback(self, result, url):
+        """The callback to be called for each URL."""
+        if isinstance(result, Failure):
+            self.logMissingURL(result, url)
+
+    def finalResultCallback(self, result):
+        """The callback to be called once all URLs have been probed."""
+        return self.ensureOrDeleteMirrorCDImageSeries(result)
+
 
 def _get_cdimage_file_list():
     url = config.distributionmirrorprober.cdimage_file_list_url
@@ -851,23 +910,34 @@ def probe_archive_mirror(mirror, logfile, unchecked_keys, logger,
     sources_paths = mirror.getExpectedSourcesPaths()
     all_paths = itertools.chain(packages_paths, sources_paths)
     request_manager = RequestManager(max_parallel, max_parallel_per_host)
+
+    call_scheds = []
     for series, pocket, component, path in all_paths:
+        sched = CallScheduler(mirror, series)
+        call_scheds.append(sched)
         url = urljoin(base_url, path)
         callbacks = ArchiveMirrorProberCallbacks(
-            mirror, series, pocket, component, url, logfile)
+            mirror, series, pocket, component, url, logfile, sched)
         unchecked_keys.append(url)
         # APT has supported redirects since 0.7.21 (2009-04-14), so allow
         # them here too.
         prober = RedirectAwareProberFactory(url)
 
         deferred = request_manager.run(prober.request_host, prober.probe)
+
+        # XXX pappacena 2020-11-25: This will do some database operation
+        # inside reactor, which might cause problems like timeouts when
+        # running HTTP requests. This should be the next optimization point:
+        # run {ensure|delete}MirrorSeries and gather all mirror freshness URLs
+        # synchronously here, and ask reactor to run just the HTTP requests.
         deferred.addCallbacks(
             callbacks.ensureMirrorSeries, callbacks.deleteMirrorSeries)
-
-        deferred.addCallback(callbacks.updateMirrorFreshness, request_manager)
+        deferred.addCallback(
+            callbacks.updateMirrorFreshness, request_manager)
         deferred.addErrback(logger.error)
 
         deferred.addBoth(checkComplete, url, unchecked_keys)
+    return call_scheds
 
 
 def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
@@ -894,6 +964,7 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
         logger.error(e)
         return
 
+    call_scheds = []
     for series, flavour, paths in cdimage_paths:
         callbacks = MirrorCDImageProberCallbacks(
             mirror, series, flavour, logfile)
@@ -906,14 +977,21 @@ def probe_cdimage_mirror(mirror, logfile, unchecked_keys, logger,
             url = urljoin(base_url, path)
             # Use a RedirectAwareProberFactory because CD mirrors are allowed
             # to redirect, and we need to cope with that.
+            sched = CallScheduler(mirror, series)
+            call_scheds.append(sched)
             prober = RedirectAwareProberFactory(url)
             deferred = request_manager.run(prober.request_host, prober.probe)
-            deferred.addErrback(callbacks.logMissingURL, url)
+            deferred.addErrback(
+                sched.schedCallback(callbacks.urlCallback, url))
             deferredList.append(deferred)
 
+        sched = CallScheduler(mirror, series)
+        call_scheds.append(sched)
         deferredList = defer.DeferredList(deferredList, consumeErrors=True)
-        deferredList.addCallback(callbacks.ensureOrDeleteMirrorCDImageSeries)
+        deferredList.addCallback(
+            sched.schedCallback(callbacks.finalResultCallback))
         deferredList.addCallback(checkComplete, mirror_key, unchecked_keys)
+    return call_scheds
 
 
 def should_skip_host(host):
@@ -1029,6 +1107,7 @@ class DistroMirrorProber:
         logfiles = {}
         probed_mirrors = []
 
+        all_scheduled_calls = []
         for mirror_id in mirror_ids:
             mirror = mirror_set[mirror_id]
             if not self._sanity_check_mirror(mirror):
@@ -1047,12 +1126,18 @@ class DistroMirrorProber:
             probed_mirrors.append(mirror)
             logfile = six.StringIO()
             logfiles[mirror_id] = logfile
-            probe_function(mirror, logfile, unchecked_keys, self.logger,
-                           max_parallel, max_parallel_per_host)
+            prob_scheduled_calls = probe_function(
+                mirror, logfile, unchecked_keys, self.logger,
+                max_parallel, max_parallel_per_host)
+            all_scheduled_calls += prob_scheduled_calls
 
         if probed_mirrors:
             reactor.run()
             self.logger.info('Probed %d mirrors.' % len(probed_mirrors))
+            self.logger.info(
+                'Starting to update mirrors statuses outside reactor now.')
+            for sched_calls in all_scheduled_calls:
+                sched_calls.run()
         else:
             self.logger.info('No mirrors to probe.')
 
diff --git a/lib/lp/registry/tests/test_distributionmirror_prober.py b/lib/lp/registry/tests/test_distributionmirror_prober.py
index 4e6abab..ee7780f 100644
--- a/lib/lp/registry/tests/test_distributionmirror_prober.py
+++ b/lib/lp/registry/tests/test_distributionmirror_prober.py
@@ -8,6 +8,8 @@ __metaclass__ = type
 from datetime import datetime
 import logging
 import os
+import re
+from textwrap import dedent
 
 from fixtures import MockPatchObject
 from lazr.uri import URI
@@ -25,6 +27,7 @@ from testtools.twistedsupport import (
     AsynchronousDeferredRunTest,
     AsynchronousDeferredRunTestForBrokenTwisted,
     )
+import transaction
 from twisted.internet import (
     defer,
     reactor,
@@ -37,8 +40,18 @@ from zope.component import getUtility
 from zope.security.proxy import removeSecurityProxy
 
 from lp.app.interfaces.launchpad import ILaunchpadCelebrities
+from lp.registry.interfaces.distributionmirror import (
+    MirrorContent,
+    MirrorStatus,
+    )
 from lp.registry.interfaces.pocket import PackagePublishingPocket
-from lp.registry.model.distributionmirror import DistributionMirror
+from lp.registry.model.distributionmirror import (
+    DistributionMirror,
+    MirrorCDImageDistroSeries,
+    MirrorDistroArchSeries,
+    MirrorDistroSeriesSource,
+    MirrorProbeRecord,
+    )
 from lp.registry.scripts import distributionmirror_prober
 from lp.registry.scripts.distributionmirror_prober import (
     _get_cdimage_file_list,
@@ -72,14 +85,18 @@ from lp.registry.tests.distributionmirror_http_server import (
     )
 from lp.services.config import config
 from lp.services.daemons.tachandler import TacTestSetup
+from lp.services.database.interfaces import IStore
 from lp.services.httpproxy.connect_tunneling import TunnelingAgent
 from lp.services.timeout import default_timeout
 from lp.testing import (
+    admin_logged_in,
     clean_up_reactor,
+    run_script,
     TestCase,
     TestCaseWithFactory,
     )
 from lp.testing.layers import (
+    LaunchpadZopelessLayer,
     TwistedLayer,
     ZopelessDatabaseLayer,
     )
@@ -1174,3 +1191,117 @@ class TestLoggingMixin(TestCase):
         logger.log_file.seek(0)
         message = logger.log_file.read()
         self.assertNotEqual(None, message)
+
+
+class TestDistroMirrorProberFunctional(TestCaseWithFactory):
+
+    layer = LaunchpadZopelessLayer
+
+    def setUp(self):
+        super(TestDistroMirrorProberFunctional, self).setUp()
+        # Makes a clean distro mirror set, with only the mirrors we want.
+        self.removeMirrors()
+
+    def removeMirrors(self):
+        """Removes all mirror information from database."""
+        store = IStore(DistributionMirror)
+        store.find(MirrorProbeRecord).remove()
+        store.find(MirrorDistroArchSeries).remove()
+        store.find(MirrorDistroSeriesSource).remove()
+        store.find(MirrorCDImageDistroSeries).remove()
+        store.find(DistributionMirror).remove()
+        store.flush()
+
+    def makeMirror(self, content_type, distro=None):
+        with admin_logged_in():
+            if distro is None:
+                distro = self.factory.makeDistribution()
+                distro.supports_mirrors = True
+                self.factory.makeDistroSeries(distribution=distro)
+            mirror = self.factory.makeMirror(
+                distro, http_url="http://fake-url.invalid";)
+            mirror.enabled = True
+            mirror.status = MirrorStatus.OFFICIAL
+            mirror.official_candidate = True
+            mirror.content = content_type
+        return mirror
+
+    def test_cdimage_prober(self):
+        """Checks that CD image prober works fine, end to end."""
+        mirror = self.makeMirror(content_type=MirrorContent.RELEASE)
+        transaction.commit()
+
+        out, err, exit_code = run_script(
+            "cronscripts/distributionmirror-prober.py --no-remote-hosts "
+            "--content-type=cdimage --no-owner-notification --force")
+        self.assertEqual(0, exit_code, err)
+
+        lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
+        self.assertEqual(dedent("""\
+            INFO    Creating lockfile: %s
+            INFO    Probing CD Image Mirrors
+            INFO    Probed 1 mirrors.
+            INFO    Starting to update mirrors statuses outside reactor now.
+            INFO    Done.
+            """) % lock_file, err)
+
+        with admin_logged_in():
+            record = removeSecurityProxy(mirror.last_probe_record)
+
+        log_lines = record.log_file.read()
+        self.assertEqual(4, len(log_lines.split("\n")))
+        self.assertIn(
+            "Found all ISO images for series The Hoary Hedgehog Release "
+            "and flavour kubuntu.", log_lines)
+        self.assertIn(
+            "Found all ISO images for series The Hoary Hedgehog Release "
+            "and flavour ubuntu.", log_lines)
+        self.assertIn(
+            "Found all ISO images for series The Warty Warthog Release "
+            "and flavour ubuntu.", log_lines)
+
+    def test_archive_prober(self):
+        """Checks that archive prober works fine, end to end."""
+        # Using ubuntu to avoid the need to create all the packages that
+        # will be checked by prober.
+        ubuntu = getUtility(ILaunchpadCelebrities).ubuntu
+        mirror = self.makeMirror(
+            content_type=MirrorContent.ARCHIVE, distro=ubuntu)
+        transaction.commit()
+
+        out, err, exit_code = run_script(
+            "cronscripts/distributionmirror-prober.py --no-remote-hosts "
+            "--content-type=archive --no-owner-notification --force")
+        self.assertEqual(0, exit_code, err)
+
+        lock_file = "/var/lock/launchpad-distributionmirror-prober.lock"
+        self.assertEqual(dedent("""\
+            INFO    Creating lockfile: %s
+            INFO    Probing Archive Mirrors
+            INFO    Probed 1 mirrors.
+            INFO    Starting to update mirrors statuses outside reactor now.
+            INFO    Done.
+            """) % lock_file, err)
+
+        with admin_logged_in():
+            record = removeSecurityProxy(mirror.last_probe_record)
+
+        log_lines = record.log_file.read()
+
+        # Make sure that prober output seems reasonable.
+        self.assertEqual(85, len(log_lines.split("\n")))
+        url = "http://fake-url.invalid/dists/";
+        self.assertEqual(40, len(re.findall(
+            (r"Ensuring MirrorDistroSeries of .* with url %s" % url) +
+            r".* exists in the database",
+            log_lines)))
+        self.assertEqual(40, len(re.findall(
+            (r"Ensuring MirrorDistroArchSeries of .* with url %s" % url) +
+            r".* exists in the database",
+            log_lines)))
+        self.assertEqual(1, len(re.findall(
+            r"Updating MirrorDistroArchSeries of .* freshness to Up to date",
+            log_lines)))
+        self.assertEqual(3, len(re.findall(
+            r"Updating MirrorDistroSeries of .* freshness to Up to date",
+            log_lines)))

Follow ups