← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jtv/maas/bug-1059453 into lp:maas

 

Jeroen T. Vermeulen has proposed merging lp:~jtv/maas/bug-1059453 into lp:maas.

Commit message:
Run celeryd synchronously, so Upstart doesn't need to track fork()s (many of which precede the operative one as maas-provision starts up).

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1059453 in MAAS: "The celery cluster worker is not properly stopped"
  https://bugs.launchpad.net/maas/+bug/1059453

For more details, see:
https://code.launchpad.net/~jtv/maas/bug-1059453/+merge/127680

Discussed with Julian and many others.  The "maas-provision start-cluster-controller" command will now keep running until either the cluster controller is rejected by an administrator, or celeryd exits for whatever reason.


Jeroen
-- 
https://code.launchpad.net/~jtv/maas/bug-1059453/+merge/127680
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jtv/maas/bug-1059453 into lp:maas.
=== modified file 'src/provisioningserver/start_cluster_controller.py'
--- src/provisioningserver/start_cluster_controller.py	2012-10-02 10:20:17 +0000
+++ src/provisioningserver/start_cluster_controller.py	2012-10-03 09:30:46 +0000
@@ -19,6 +19,7 @@
 import json
 import os
 from pwd import getpwnam
+from subprocess import check_call
 from time import sleep
 from urllib2 import (
     HTTPError,
@@ -121,6 +122,20 @@
         raise AssertionError("Unexpected return code: %r" % status_code)
 
 
+class Become:
+    """Set user and group id."""
+
+    def __init__(self, uid, gid):
+        self.uid = uid
+        self.gid = gid
+
+    def __call__(self):
+        # Change gid first, just in case changing the uid might deprive
+        # us of the privileges required to setgid.
+        os.setgid(self.gid)
+        os.setuid(self.uid)
+
+
 def start_celery(connection_details, user, group):
     broker_url = connection_details['BROKER_URL']
     uid = getpwnam(user).pw_uid
@@ -137,12 +152,9 @@
         '-Q', get_cluster_uuid(),
         ]
 
-    pid = os.fork()
-    if pid == 0:
-        # Child process.  Become the right user, and start celery.
-        os.setuid(uid)
-        os.setgid(gid)
-        os.execvpe(command[0], command, env)
+    return_code = check_call(command, env=env, preexec_fn=Become(uid, gid))
+    if return_code != 0:
+        raise SystemExit(return_code)
 
 
 def request_refresh(server_url):
@@ -161,9 +173,12 @@
     This starts up celeryd, listening to the broker that the region
     controller pointed us to, and on the appropriate queue.
     """
+    # Get the region controller to send out credentials.  If it arrives
+    # before celeryd has started up, we should find the message waiting
+    # in our queue.  Even if we're new and the queue did not exist yet,
+    # the arriving task will create the queue.
+    request_refresh(server_url)
     start_celery(connection_details, user=user, group=group)
-    sleep(10)
-    request_refresh(server_url)
 
 
 def run(args):

=== modified file 'src/provisioningserver/tests/test_start_cluster_controller.py'
--- src/provisioningserver/tests/test_start_cluster_controller.py	2012-10-02 10:20:17 +0000
+++ src/provisioningserver/tests/test_start_cluster_controller.py	2012-10-03 09:30:46 +0000
@@ -17,7 +17,6 @@
 import httplib
 from io import BytesIO
 import json
-import os
 from random import randint
 from urllib2 import (
     HTTPError,
@@ -87,10 +86,8 @@
         self.patch(start_cluster_controller, 'getpwnam')
         start_cluster_controller.getpwnam.pw_uid = randint(3000, 4000)
         start_cluster_controller.getpwnam.pw_gid = randint(3000, 4000)
-        self.patch(os, 'fork').side_effect = Executing()
-        self.patch(os, 'execvpe').side_effect = Executing()
-        self.patch(os, 'setuid')
-        self.patch(os, 'setgid')
+        self.patch(
+            start_cluster_controller, 'check_call').side_effect = Executing()
         get_uuid = self.patch(start_cluster_controller, 'get_cluster_uuid')
         get_uuid.return_value = factory.getRandomUUID()
 
@@ -133,36 +130,18 @@
         """Prepare to return "request pending" from API request."""
         self.prepare_response(httplib.ACCEPTED)
 
-    def pretend_to_fork_into_child(self):
-        """Make `fork` act as if it's returning into the child process.
-
-        The start_cluster_controller child process then executes celeryd,
-        so this call also patches up the call that does that so it pretends
-        to be successful.
-        """
-        self.patch(os, 'fork').return_value = 0
-        self.patch(os, 'execvpe')
-
-    def pretend_to_fork_into_parent(self):
-        """Make `fork` act as if it's returning into the parent process."""
-        self.patch(os, 'fork').return_value = randint(2, 65535)
-
     def test_run_command(self):
         # We can't really run the script, but we can verify that (with
         # the right system functions patched out) we can run it
         # directly.
-        self.pretend_to_fork_into_child()
-        self.patch(start_cluster_controller, 'sleep')
+        start_cluster_controller.sleep.side_effect = None
+        start_cluster_controller.check_call.side_effect = None
+        start_cluster_controller.check_call.return_value = 0
         self.prepare_success_response()
         parser = ArgumentParser()
         start_cluster_controller.add_arguments(parser)
         start_cluster_controller.run(parser.parse_args((make_url(),)))
-        self.assertEqual(1, os.fork.call_count)
-        self.assertEqual(1, os.execvpe.call_count)
-        os.setuid.assert_called_once_with(
-            start_cluster_controller.getpwnam.return_value.pw_uid)
-        os.setgid.assert_called_once_with(
-            start_cluster_controller.getpwnam.return_value.pw_gid)
+        self.assertEqual(1, start_cluster_controller.check_call.call_count)
 
     def test_uses_given_url(self):
         url = make_url('region')
@@ -227,10 +206,11 @@
             (server_url, connection_details))
 
     def test_start_up_calls_refresh_secrets(self):
+        start_cluster_controller.check_call.side_effect = None
+        start_cluster_controller.check_call.return_value = 0
+        start_cluster_controller.sleep.side_effect = None
         url = make_url('region')
         connection_details = self.make_connection_details()
-        self.pretend_to_fork_into_parent()
-        self.patch(start_cluster_controller, 'sleep')
         self.prepare_success_response()
 
         start_cluster_controller.start_up(
@@ -246,8 +226,9 @@
         self.assertEqual("refresh_workers", post["op"])
 
     def test_start_up_ignores_failure_on_refresh_secrets(self):
-        self.pretend_to_fork_into_parent()
-        self.patch(start_cluster_controller, 'sleep')
+        start_cluster_controller.check_call.side_effect = None
+        start_cluster_controller.check_call.return_value = 0
+        start_cluster_controller.sleep.side_effect = None
         self.patch(MAASDispatcher, 'dispatch_query').side_effect = URLError(
             "Simulated HTTP failure.")
 
@@ -255,4 +236,15 @@
             make_url(), self.make_connection_details(),
             factory.make_name('user'), factory.make_name('group'))
 
-        self.assertEqual(1, os.fork.call_count)
+        self.assertEqual(1, start_cluster_controller.check_call.call_count)
+
+    def test_start_celery_raises_if_celeryd_fails(self):
+        return_value = randint(1, 127)
+        start_cluster_controller.check_call.side_effect = None
+        start_cluster_controller.check_call.return_value = return_value
+        error = self.assertRaises(
+            SystemExit,
+            start_cluster_controller.start_celery,
+            self.make_connection_details(),
+            factory.make_name('user'), factory.make_name('group'))
+        self.assertEqual(return_value, error.code)

=== modified file 'src/provisioningserver/utils.py'
--- src/provisioningserver/utils.py	2012-09-25 05:21:24 +0000
+++ src/provisioningserver/utils.py	2012-10-03 09:30:46 +0000
@@ -376,7 +376,7 @@
         try:
             self.setup()
             self.execute(argv)
-        except CalledProcessError, error:
+        except CalledProcessError as error:
             # Print error.cmd and error.output too?
             raise SystemExit(error.returncode)
         except KeyboardInterrupt: