← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup into lp:launchpad

 

Colin Watson has proposed merging lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup into lp:launchpad.

Commit message:
Clean up with_timeout worker thread upon receiving SoftTimeLimitExceeded from celery.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1783315 in Launchpad itself: "celery hung after long git repository scan"
  https://bugs.launchpad.net/launchpad/+bug/1783315

For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/celery-soft-timeout-improve-cleanup/+merge/351361

I spent ages trying to construct unit tests for this, but I couldn't manage to make it work: mocks don't propagate into celery worker processes.  I have at least tested it in a local deployment and confirmed that, before this patch, a request from a job that received SoftTimeLimitExceeded emitted a log entry some time later indicating that it hadn't been cancelled, while with this patch that doesn't happen.

I'm not certain that this is what caused the celery master to stop dispatching workers, since I couldn't reproduce that particular failure mode locally, but this seems like a possible cause and worth trying.
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup into lp:launchpad.
=== modified file 'lib/lp/services/timeout.py'
--- lib/lp/services/timeout.py	2018-07-13 12:48:19 +0000
+++ lib/lp/services/timeout.py	2018-07-26 14:53:40 +0000
@@ -202,6 +202,16 @@
 
     def __call__(self, f):
         """Wraps the method."""
+        def cleanup(t, args):
+            if self.cleanup is not None:
+                if isinstance(self.cleanup, basestring):
+                    # 'self' will be first positional argument.
+                    getattr(args[0], self.cleanup)()
+                else:
+                    self.cleanup()
+                # Collect cleaned-up worker thread.
+                t.join()
+
         def call_with_timeout(*args, **kwargs):
             # Ensure that we have a timeout before we start the thread
             timeout = self.timeout
@@ -214,16 +224,17 @@
                     timeout = timeout()
             t = ThreadCapturingResult(f, args, kwargs)
             t.start()
-            t.join(timeout)
+            try:
+                t.join(timeout)
+            except Exception as e:
+                # This will commonly be SoftTimeLimitExceeded from celery,
+                # since celery's timeout often happens before the job's due
+                # to job setup time.
+                if t.isAlive():
+                    cleanup(t, args)
+                raise
             if t.isAlive():
-                if self.cleanup is not None:
-                    if isinstance(self.cleanup, basestring):
-                        # 'self' will be first positional argument.
-                        getattr(args[0], self.cleanup)()
-                    else:
-                        self.cleanup()
-                    # Collect cleaned-up worker thread.
-                    t.join()
+                cleanup(t, args)
                 raise TimeoutError("timeout exceeded.")
             if getattr(t, 'exc_info', None) is not None:
                 exc_info = t.exc_info


Follow ups