launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #12855
[Merge] lp:~jtv/maas/bug-1059453 into lp:maas
Jeroen T. Vermeulen has proposed merging lp:~jtv/maas/bug-1059453 into lp:maas.
Commit message:
Run celeryd synchronously, so Upstart doesn't need to track fork()s (many of which precede the operative one as maas-provision starts up).
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1059453 in MAAS: "The celery cluster worker is not properly stopped"
https://bugs.launchpad.net/maas/+bug/1059453
For more details, see:
https://code.launchpad.net/~jtv/maas/bug-1059453/+merge/127680
Discussed with Julian and many others. The "maas-provision start-cluster-controller" command will now keep running until either the cluster controller is rejected by an administrator, or celeryd exits for whatever reason.
Jeroen
--
https://code.launchpad.net/~jtv/maas/bug-1059453/+merge/127680
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jtv/maas/bug-1059453 into lp:maas.
=== modified file 'src/provisioningserver/start_cluster_controller.py'
--- src/provisioningserver/start_cluster_controller.py 2012-10-02 10:20:17 +0000
+++ src/provisioningserver/start_cluster_controller.py 2012-10-03 09:30:46 +0000
@@ -19,6 +19,7 @@
import json
import os
from pwd import getpwnam
+from subprocess import check_call
from time import sleep
from urllib2 import (
HTTPError,
@@ -121,6 +122,20 @@
raise AssertionError("Unexpected return code: %r" % status_code)
+class Become:
+ """Set user and group id."""
+
+ def __init__(self, uid, gid):
+ self.uid = uid
+ self.gid = gid
+
+ def __call__(self):
+ # Change gid first, just in case changing the uid might deprive
+ # us of the privileges required to setgid.
+ os.setgid(self.gid)
+ os.setuid(self.uid)
+
+
def start_celery(connection_details, user, group):
broker_url = connection_details['BROKER_URL']
uid = getpwnam(user).pw_uid
@@ -137,12 +152,9 @@
'-Q', get_cluster_uuid(),
]
- pid = os.fork()
- if pid == 0:
- # Child process. Become the right user, and start celery.
- os.setuid(uid)
- os.setgid(gid)
- os.execvpe(command[0], command, env)
+ return_code = check_call(command, env=env, preexec_fn=Become(uid, gid))
+ if return_code != 0:
+ raise SystemExit(return_code)
def request_refresh(server_url):
@@ -161,9 +173,12 @@
This starts up celeryd, listening to the broker that the region
controller pointed us to, and on the appropriate queue.
"""
+ # Get the region controller to send out credentials. If it arrives
+ # before celeryd has started up, we should find the message waiting
+ # in our queue. Even if we're new and the queue did not exist yet,
+ # the arriving task will create the queue.
+ request_refresh(server_url)
start_celery(connection_details, user=user, group=group)
- sleep(10)
- request_refresh(server_url)
def run(args):
=== modified file 'src/provisioningserver/tests/test_start_cluster_controller.py'
--- src/provisioningserver/tests/test_start_cluster_controller.py 2012-10-02 10:20:17 +0000
+++ src/provisioningserver/tests/test_start_cluster_controller.py 2012-10-03 09:30:46 +0000
@@ -17,7 +17,6 @@
import httplib
from io import BytesIO
import json
-import os
from random import randint
from urllib2 import (
HTTPError,
@@ -87,10 +86,8 @@
self.patch(start_cluster_controller, 'getpwnam')
start_cluster_controller.getpwnam.pw_uid = randint(3000, 4000)
start_cluster_controller.getpwnam.pw_gid = randint(3000, 4000)
- self.patch(os, 'fork').side_effect = Executing()
- self.patch(os, 'execvpe').side_effect = Executing()
- self.patch(os, 'setuid')
- self.patch(os, 'setgid')
+ self.patch(
+ start_cluster_controller, 'check_call').side_effect = Executing()
get_uuid = self.patch(start_cluster_controller, 'get_cluster_uuid')
get_uuid.return_value = factory.getRandomUUID()
@@ -133,36 +130,18 @@
"""Prepare to return "request pending" from API request."""
self.prepare_response(httplib.ACCEPTED)
- def pretend_to_fork_into_child(self):
- """Make `fork` act as if it's returning into the child process.
-
- The start_cluster_controller child process then executes celeryd,
- so this call also patches up the call that does that so it pretends
- to be successful.
- """
- self.patch(os, 'fork').return_value = 0
- self.patch(os, 'execvpe')
-
- def pretend_to_fork_into_parent(self):
- """Make `fork` act as if it's returning into the parent process."""
- self.patch(os, 'fork').return_value = randint(2, 65535)
-
def test_run_command(self):
# We can't really run the script, but we can verify that (with
# the right system functions patched out) we can run it
# directly.
- self.pretend_to_fork_into_child()
- self.patch(start_cluster_controller, 'sleep')
+ start_cluster_controller.sleep.side_effect = None
+ start_cluster_controller.check_call.side_effect = None
+ start_cluster_controller.check_call.return_value = 0
self.prepare_success_response()
parser = ArgumentParser()
start_cluster_controller.add_arguments(parser)
start_cluster_controller.run(parser.parse_args((make_url(),)))
- self.assertEqual(1, os.fork.call_count)
- self.assertEqual(1, os.execvpe.call_count)
- os.setuid.assert_called_once_with(
- start_cluster_controller.getpwnam.return_value.pw_uid)
- os.setgid.assert_called_once_with(
- start_cluster_controller.getpwnam.return_value.pw_gid)
+ self.assertEqual(1, start_cluster_controller.check_call.call_count)
def test_uses_given_url(self):
url = make_url('region')
@@ -227,10 +206,11 @@
(server_url, connection_details))
def test_start_up_calls_refresh_secrets(self):
+ start_cluster_controller.check_call.side_effect = None
+ start_cluster_controller.check_call.return_value = 0
+ start_cluster_controller.sleep.side_effect = None
url = make_url('region')
connection_details = self.make_connection_details()
- self.pretend_to_fork_into_parent()
- self.patch(start_cluster_controller, 'sleep')
self.prepare_success_response()
start_cluster_controller.start_up(
@@ -246,8 +226,9 @@
self.assertEqual("refresh_workers", post["op"])
def test_start_up_ignores_failure_on_refresh_secrets(self):
- self.pretend_to_fork_into_parent()
- self.patch(start_cluster_controller, 'sleep')
+ start_cluster_controller.check_call.side_effect = None
+ start_cluster_controller.check_call.return_value = 0
+ start_cluster_controller.sleep.side_effect = None
self.patch(MAASDispatcher, 'dispatch_query').side_effect = URLError(
"Simulated HTTP failure.")
@@ -255,4 +236,15 @@
make_url(), self.make_connection_details(),
factory.make_name('user'), factory.make_name('group'))
- self.assertEqual(1, os.fork.call_count)
+ self.assertEqual(1, start_cluster_controller.check_call.call_count)
+
+ def test_start_celery_raises_if_celeryd_fails(self):
+ return_value = randint(1, 127)
+ start_cluster_controller.check_call.side_effect = None
+ start_cluster_controller.check_call.return_value = return_value
+ error = self.assertRaises(
+ SystemExit,
+ start_cluster_controller.start_celery,
+ self.make_connection_details(),
+ factory.make_name('user'), factory.make_name('group'))
+ self.assertEqual(return_value, error.code)
=== modified file 'src/provisioningserver/utils.py'
--- src/provisioningserver/utils.py 2012-09-25 05:21:24 +0000
+++ src/provisioningserver/utils.py 2012-10-03 09:30:46 +0000
@@ -376,7 +376,7 @@
try:
self.setup()
self.execute(argv)
- except CalledProcessError, error:
+ except CalledProcessError as error:
# Print error.cmd and error.output too?
raise SystemExit(error.returncode)
except KeyboardInterrupt: