launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #12400
[Merge] lp:~jtv/maas/start-cluster-controller into lp:maas
Jeroen T. Vermeulen has proposed merging lp:~jtv/maas/start-cluster-controller into lp:maas.
Commit message:
New maas-provision command: start-cluster-controller.
Calls the region controller to register itself as a cluster controller. Then uses the RabbitMQ details which the region controller sends back to start its own celeryd. Polls for a definite answer: has the cluster controller been accepted or rejected?
Requested reviews:
MAAS Maintainers (maas-maintainers)
For more details, see:
https://code.launchpad.net/~jtv/maas/start-cluster-controller/+merge/126031
See commit message. Pre-imps for various aspects with Julian, Gavin, and Raphael.
The new command takes one argument: the URL for the region controller. It calls the "register" API operation there, which is what returns RabbitMQ connection information once the requesting cluster controller has been approved.
My code tells celery which broker to contact by setting CELERY_BROKER_URL, which is new in python 2.4. It overrides any URL that may have been set in the celeryconfig. The URL we used to set was the default anyway; there really is no reason to mention it in the config now.
Since cluster-controller registration is an unauthenticated API call, I added an alternative to MAASOAuth called NoAuth which fills the same slot in MAASClient but does not do any authentication. We may end up wanting to use that in more places.
In the future we'll also want to pass this a queue name, but it has no dedicated place on the cluster controller yet. Also, for now, the master cluster uses a special value "celery" as a queue name. We'll want to resolve that.
With this in place, we can drop our Upstart script for celery from the packaging branch. To replace it, we'll want something that calls "maas-provision start-cluster-controller <region-controller-url>." The parameter will come from a branch that Julian is currently working on.
Jeroen
--
https://code.launchpad.net/~jtv/maas/start-cluster-controller/+merge/126031
Your team MAAS Maintainers is requested to review the proposed merge of lp:~jtv/maas/start-cluster-controller into lp:maas.
=== modified file 'etc/celeryconfig.py'
--- etc/celeryconfig.py 2012-09-24 06:06:47 +0000
+++ etc/celeryconfig.py 2012-09-24 15:27:30 +0000
@@ -43,10 +43,6 @@
# List of interfaces that the dhcpd should service (if managed by MAAS).
DHCP_INTERFACES_FILE = '/var/lib/maas/dhcpd-interfaces'
-# Broken connection information.
-# Format: transport://userid:password@hostname:port/virtual_host
-BROKER_URL = 'amqp://guest:guest@localhost:5672//'
-
try:
import maas_local_celeryconfig
except ImportError:
=== modified file 'src/apiclient/maas_client.py'
--- src/apiclient/maas_client.py 2012-09-12 19:56:23 +0000
+++ src/apiclient/maas_client.py 2012-09-24 15:27:30 +0000
@@ -47,6 +47,19 @@
headers.update(oauth_request.to_header())
+class NoAuth:
+ """Dummy authentication class for making unauthenticated requests."""
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ def sign_request(self, *args, **kwargs):
+ """Go through the motions of signing a request.
+
+ Since this class does not really authenticate, this does nothing.
+ """
+
+
class MAASDispatcher:
"""Helper class to connect to a MAAS server using blocking requests.
=== modified file 'src/provisioningserver/__main__.py'
--- src/provisioningserver/__main__.py 2012-09-10 03:38:15 +0000
+++ src/provisioningserver/__main__.py 2012-09-24 15:27:30 +0000
@@ -16,6 +16,7 @@
import provisioningserver.dhcp.writer
import provisioningserver.pxe.install_bootloader
import provisioningserver.pxe.install_image
+import provisioningserver.start_cluster_controller
from provisioningserver.utils import (
AtomicWriteScript,
MainScript,
@@ -28,6 +29,7 @@
'generate-dhcp-config': provisioningserver.dhcp.writer,
'install-pxe-bootloader': provisioningserver.pxe.install_bootloader,
'install-pxe-image': provisioningserver.pxe.install_image,
+ 'start-cluster-controller': provisioningserver.start_cluster_controller,
}
=== added file 'src/provisioningserver/start_cluster_controller.py'
--- src/provisioningserver/start_cluster_controller.py 1970-01-01 00:00:00 +0000
+++ src/provisioningserver/start_cluster_controller.py 2012-09-24 15:27:30 +0000
@@ -0,0 +1,136 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Command: start the cluster controller."""
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = [
+ 'add_arguments',
+ 'run',
+ ]
+
+import httplib
+import json
+import os
+from subprocess import check_call
+import sys
+from time import sleep
+from urllib2 import (
+ HTTPError,
+ URLError,
+ )
+
+from apiclient.maas_client import (
+ MAASClient,
+ MAASDispatcher,
+ NoAuth,
+ )
+from provisioningserver.logging import task_logger
+
+
+class ClusterControllerRejected(Exception):
+ """Request to become a cluster controller has been rejected."""
+
+
+def add_arguments(parser):
+ """For use by :class:`MainScript`."""
+ parser.add_argument(
+ 'server_url', metavar='URL', help="URL to the MAAS region controller.")
+
+
+def log_error(exception):
+ task_logger.info(
+ "Could not register with region controller: %s."
+ % exception.reason)
+
+
+def register(server_url):
+ """Request Rabbit connection details from the domain controller.
+
+ Offers this machine to the region controller as a potential cluster
+ controller.
+
+ :return: A dict of connection details if this cluster controller has been
+ accepted, or `None` if there is no definite response yet. If there
+ is no definite response, retry this call later.
+ :raise ClusterControllerRejected: if this system has been rejected as a
+ cluster controller.
+ """
+ known_responses = [httplib.OK, httplib.FORBIDDEN, httplib.ACCEPTED]
+ client = MAASClient(NoAuth(), MAASDispatcher(), server_url)
+ try:
+ response = client.post('api/1.0/nodegroups', 'register')
+ except HTTPError as e:
+ status_code = e.code
+ if e.code not in known_responses:
+ log_error(e)
+ # Unknown error. Keep trying.
+ return None
+ except URLError as e:
+ log_error(e)
+ # Unknown error. Keep trying.
+ return None
+ else:
+ status_code = response.getcode()
+
+ if status_code == httplib.OK:
+ # Our application has been approved. Proceed.
+ return json.loads(response.read())
+ elif status_code == httplib.ACCEPTED:
+ # Our application is still waiting for approval. Keep trying.
+ return None
+ elif status_code == httplib.FORBIDDEN:
+ # Our application has been rejected. Give up.
+ raise ClusterControllerRejected(
+ "This system has been rejected as a cluster controller.")
+ else:
+ raise AssertionError("Unexpected return code: %r" % status_code)
+
+
+def start_up(connection_details):
+ """We've been accepted as a cluster controller; start doing the job.
+
+ This starts up celeryd, listening to the broker that the region
+ controller pointed us to, and on the appropriate queue.
+ """
+ broker_url = connection_details['BROKER_URL']
+
+ # XXX JeroenVermeulen 2012-09-24, bug=1055523: Fill in proper
+ # cluster-specific queue name once we have those (based on cluster
+ # uuid).
+ queue = 'celery'
+
+ env = {
+ # Tell celeryd what broker to listen to.
+ 'CELERY_BROKER_URL': broker_url,
+ 'PATH': os.environ['PATH'],
+ 'PYTHONPATH': ':'.join(sys.path),
+ }
+
+ check_call([
+ 'celeryd',
+ '--logfile=/var/log/maas/celery.log',
+ '--loglevel=INFO',
+ '--beat', '--schedule=/var/lib/maas/celerybeat-schedule',
+ '-Q', "%s,common" % queue,
+ ],
+ env=env)
+
+
+def run(args):
+ """Start the cluster controller.
+
+ If this system is still awaiting approval as a cluster controller, this
+ command will keep looping until it gets a definite answer.
+ """
+ connection_details = register(args.server_url)
+ while connection_details is None:
+ sleep(60)
+ connection_details = register(args.server_url)
+ start_up(connection_details)
=== added file 'src/provisioningserver/tests/test_start_cluster_controller.py'
--- src/provisioningserver/tests/test_start_cluster_controller.py 1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tests/test_start_cluster_controller.py 2012-09-24 15:27:30 +0000
@@ -0,0 +1,141 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for the `start_cluster_controller` command."""
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = []
+
+from argparse import ArgumentParser
+from collections import namedtuple
+import httplib
+from io import BytesIO
+import json
+from urllib2 import HTTPError
+
+from apiclient.maas_client import MAASDispatcher
+from maastesting.factory import factory
+from mock import Mock
+from provisioningserver import start_cluster_controller
+from provisioningserver.testing.testcase import PservTestCase
+
+
+class Sleeping(Exception):
+ """Exception: `sleep` has been called."""
+
+
+FakeArgs = namedtuple('FakeArgs', ['server_url'])
+
+
+class FakeURLOpenResponse:
+ """Cheap simile of a `urlopen` result."""
+
+ def __init__(self, content, status=httplib.OK):
+ self._content = content
+ self._status_code = status
+
+ def read(self):
+ return self._content
+
+ def getcode(self):
+ return self._status_code
+
+
+def make_url(name_hint='host'):
+ return "http://%s.example.com/%s/" % (
+ factory.make_name(name_hint),
+ factory.make_name('path'),
+ )
+
+
+class TestStartClusterController(PservTestCase):
+
+ def setUp(self):
+ super(TestStartClusterController, self).setUp()
+ self.patch(
+ start_cluster_controller, 'sleep', Mock(side_effect=Sleeping()))
+
+ def make_connection_details(self):
+ return {
+ 'BROKER_URL': make_url('broker'),
+ }
+
+ def prepare_response(self, http_code, content=""):
+ """Prepare to return the given http response from API request."""
+ self.patch(
+ MAASDispatcher, 'dispatch_query',
+ Mock(return_value=FakeURLOpenResponse(content, status=http_code)))
+
+ def prepare_success_response(self):
+ """Prepare to return connection details from API request."""
+ details = self.make_connection_details()
+ self.prepare_response(httplib.OK, json.dumps(details))
+ return details
+
+ def prepare_rejection_response(self):
+ """Prepare to return "rejected" from API request."""
+ self.prepare_response(httplib.FORBIDDEN)
+
+ def prepare_pending_response(self):
+ """Prepare to return "request pending" from API request."""
+ self.prepare_response(httplib.ACCEPTED)
+
+ def test_run_command(self):
+ # We can't really run the script, but we can verify that (with
+ # the right system functions patched out) we can run it
+ # directly.
+ self.patch(start_cluster_controller, 'check_call')
+ self.prepare_success_response()
+ parser = ArgumentParser()
+ start_cluster_controller.add_arguments(parser)
+ start_cluster_controller.run(parser.parse_args((make_url(), )))
+ self.assertNotEqual(0, start_cluster_controller.check_call.call_count)
+
+ def test_uses_given_url(self):
+ url = make_url('region')
+ self.patch(start_cluster_controller, 'start_up')
+ self.prepare_success_response()
+ start_cluster_controller.run(FakeArgs(url))
+ (args, kwargs) = MAASDispatcher.dispatch_query.call_args
+ self.assertEqual(url + 'api/1.0/nodegroups', args[0])
+
+ def test_fails_if_declined(self):
+ self.patch(start_cluster_controller, 'start_up')
+ self.prepare_rejection_response()
+ self.assertRaises(
+ start_cluster_controller.ClusterControllerRejected,
+ start_cluster_controller.run, FakeArgs(make_url()))
+ self.assertItemsEqual([], start_cluster_controller.start_up.calls_list)
+
+ def test_polls_while_pending(self):
+ self.patch(start_cluster_controller, 'start_up')
+ self.prepare_pending_response()
+ self.assertRaises(
+ Sleeping,
+ start_cluster_controller.run, FakeArgs(make_url()))
+ self.assertItemsEqual([], start_cluster_controller.start_up.calls_list)
+
+ def test_polls_on_unexpected_errors(self):
+ self.patch(start_cluster_controller, 'start_up')
+ self.patch(
+ MAASDispatcher, 'dispatch_query',
+ Mock(side_effect=HTTPError(
+ make_url(), httplib.REQUEST_TIMEOUT, "Timeout.", '',
+ BytesIO())))
+ self.assertRaises(
+ Sleeping,
+ start_cluster_controller.run, FakeArgs(make_url()))
+ self.assertItemsEqual([], start_cluster_controller.start_up.calls_list)
+
+ def test_starts_up_once_accepted(self):
+ self.patch(start_cluster_controller, 'start_up')
+ connection_details = self.prepare_success_response()
+ start_cluster_controller.run(FakeArgs(make_url()))
+ start_cluster_controller.start_up.assert_called_once_with(
+ connection_details)
Follow ups