launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #12921
[Merge] lp:~jameinel/maas/tag-updating into lp:maas
John A Meinel has proposed merging lp:~jameinel/maas/tag-updating into lp:maas.
Commit message:
Add a task for the provisioning_server that can update tags using the APIs we just added.
This allows us to farm out all the work for processing 100,000 tags into the provisioning_servers, which should allow us to consistently evaluate a Tag definition in under 10s, even as we scale.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~jameinel/maas/tag-updating/+merge/127962
Add a task for the provisioning_server that can update tags using the APIs we just added.
This uses a lot of mocking to make sure the calls are correct, but I'll still need to add testing on the other side that end-to-end actually works.
I can wait to land this until the next patch is complete if so desired, but I would like to get feedback on the mock work that was done here.
--
https://code.launchpad.net/~jameinel/maas/tag-updating/+merge/127962
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/maas/tag-updating into lp:maas.
=== modified file 'acceptance/Makefile'
--- acceptance/Makefile 2012-09-21 14:12:19 +0000
+++ acceptance/Makefile 2012-10-04 09:12:40 +0000
@@ -15,7 +15,7 @@
# $ make series=randy
#
-include /etc/lsb-release
+DISTRIB_CODENAME=unstable
# Default to the newer of Quantal or the local series.
series := $(lastword $(sort quantal $(DISTRIB_CODENAME)))
=== added file 'src/provisioningserver/tags.py'
--- src/provisioningserver/tags.py 1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tags.py 2012-10-04 09:12:40 +0000
@@ -0,0 +1,146 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Celery jobs for managing tags.
+
+"""
+
+import json
+from lxml import etree
+
+from apiclient.maas_client import (
+ MAASClient,
+ MAASDispatcher,
+ MAASOAuth,
+ )
+
+from provisioningserver.auth import (
+ get_recorded_api_credentials,
+ get_recorded_maas_url,
+ get_recorded_nodegroup_uuid,
+ )
+
+from provisioningserver.logging import task_logger
+
+
+DEFAULT_BATCH_SIZE = 100
+
+
+def get_cached_knowledge():
+ """Get all the information that we need to know, or raise an error.
+
+ :return: (client, nodegroup_uuid)
+ """
+ maas_url = get_recorded_maas_url()
+ if maas_url is None:
+ task_logger.error("Not updating tags: don't have API URL yet.")
+ return None, None
+ api_credentials = get_recorded_api_credentials()
+ if api_credentials is None:
+ task_logger.error("Not updating tags: don't have API key yet.")
+ return None, None
+ nodegroup_uuid = get_recorded_nodegroup_uuid()
+ if nodegroup_uuid is None:
+ task_logger.error("Not updating tags: don't have UUID yet.")
+ return None, None
+ client = MAASClient(MAASOAuth(*api_credentials), MAASDispatcher(),
+ maas_url)
+ return client, nodegroup_uuid
+
+
+def get_nodes_for_node_group(client, nodegroup_uuid):
+ """Retrieve the UUIDs of nodes in a particular group.
+
+ :param client: MAAS client instance
+ :param nodegroup_uuid: Node group for which to retrieve nodes
+ :return: List of UUIDs for nodes in nodegroup
+ """
+ path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid)
+ response = client.get(path, op='list_nodes')
+ # XXX: Check the response code before we parse the content
+ return json.loads(response.content)
+
+
+def get_hardware_details_for_nodes(client, nodegroup_uuid, system_ids):
+ """Retrieve the lshw output for a set of nodes.
+
+ :param client: MAAS client
+ :param system_ids: List of UUIDs of systems for which to fetch lshw data
+ :return: Dictionary mapping node UUIDs to lshw output
+ """
+ path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
+ # TODO: Do we pass system_ids as a python list? Or do we json.dumps it
+ # first?
+ response = client.get(
+ path, op='node_hardware_details', system_ids=system_ids)
+ # XXX: Check the response code before we parse the content
+ return json.loads(response.content)
+
+
+def update_node_tags(client, tag_name, uuid, added, removed):
+ """Update the nodes relevant for a particular tag.
+
+ :param client: MAAS client
+ :param tag_name: Name of tag
+ :param uuid: NodeGroup uuid of this worker. Needed for security
+ permissions. (The nodegroup worker is only allowed to touch nodes in
+ its nodegroup, otherwise you need to be a superuser.)
+ :param added: Set of nodes to add
+ :param removed: Set of nodes to remove
+ """
+ path = 'api/1.0/tags/%s/' % (tag_name,)
+ response = client.post(path, op='update_nodes', add=added, remove=removed)
+ # XXX: Check the response code before we parse the content
+ return json.loads(response.content)
+
+
+def process_batch(xpath, hardware_details):
+ """Get the details for one batch, and process whether they match or not.
+ """
+ # Fetch node XML in batches
+ matched_nodes = []
+ unmatched_nodes = []
+ for system_id, hw_xml in hardware_details:
+ xml = etree.XML(hw_xml)
+ if xpath(xml):
+ matched_nodes.append(system_id)
+ else:
+ unmatched_nodes.append(system_id)
+ return matched_nodes, unmatched_nodes
+
+
+def process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+ batch_size=None):
+ if batch_size is None:
+ batch_size = DEFAULT_BATCH_SIZE
+ for i in range(0, len(system_ids), batch_size):
+ selected_ids = system_ids[i:i + batch_size]
+ details = get_hardware_details_for_nodes(
+ client, nodegroup_uuid, selected_ids)
+ matched, unmatched = process_batch(xpath, details)
+ # JAM 2012-10-04 If we wanted, we could defer the update_nodes until
+ # after everything was processed, but this lets us do incremental
+ # improvements, and avoids POSTing 4MB of info at once.
+ update_node_tags(client, tag_name, nodegroup_uuid, matched, unmatched)
+
+
+def process_node_tags(tag_name, tag_definition, batch_size=None):
+ """Update the nodes for a new/changed tag definition.
+
+ :param tag_name: Name of the tag to update nodes for
+ :param tag_definition: Tag definition
+ :param batch_size: Size of batch
+ """
+ client, nodegroup_uuid = get_cached_knowledge()
+ if not all([client, nodegroup_uuid]):
+ task_logger.error('Unable to update tag: %s for definition %r'
+ ' please refresh secrets, then rebuild this tag'
+ % (tag_name, tag_definition))
+ return
+ # We evaluate this early, so we can fail before sending a bunch of data to
+ # the server
+ xpath = etree.XPath(tag_definition)
+ # Get nodes to process
+ system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
+ process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+ batch_size=batch_size)
=== modified file 'src/provisioningserver/tasks.py'
--- src/provisioningserver/tasks.py 2012-09-29 23:49:08 +0000
+++ src/provisioningserver/tasks.py 2012-10-04 09:12:40 +0000
@@ -30,7 +30,10 @@
from celery.app import app_or_default
from celery.task import task
-from provisioningserver import boot_images
+from provisioningserver import (
+ boot_images,
+ tags,
+ )
from provisioningserver.auth import (
record_api_credentials,
record_maas_url,
@@ -335,3 +338,13 @@
def report_boot_images():
"""For master worker only: report available netboot images."""
boot_images.report_to_server()
+
+
+@task
+def update_node_tags(tag_name, tag_definition):
+ """Update the nodes for a new/changed tag definition.
+
+ :param tag_name: Name of the tag to update nodes for
+ :param tag_definition: Tag definition
+ """
+ tags.process_node_tags(tag_name, tag_definition)
=== modified file 'src/provisioningserver/testing/testcase.py'
--- src/provisioningserver/testing/testcase.py 2012-08-24 02:51:42 +0000
+++ src/provisioningserver/testing/testcase.py 2012-10-04 09:12:40 +0000
@@ -14,7 +14,14 @@
'PservTestCase',
]
+from apiclient.testing.credentials import make_api_credentials
from maastesting import testcase
+from maastesting.factory import factory
+from provisioningserver.auth import (
+ record_api_credentials,
+ record_maas_url,
+ record_nodegroup_uuid,
+ )
from provisioningserver.testing.worker_cache import WorkerCacheFixture
@@ -23,3 +30,22 @@
def setUp(self):
super(PservTestCase, self).setUp()
self.useFixture(WorkerCacheFixture())
+
+ def make_maas_url(self):
+ return 'http://127.0.0.1/%s' % factory.make_name('path')
+
+ def set_maas_url(self):
+ record_maas_url(self.make_maas_url())
+
+ def set_api_credentials(self):
+ record_api_credentials(':'.join(make_api_credentials()))
+
+ def set_node_group_uuid(self):
+ nodegroup_uuid = factory.make_name('nodegroupuuid')
+ record_nodegroup_uuid(nodegroup_uuid)
+
+ def set_secrets(self):
+ """Setup all secrets that we would get from refresh_secrets."""
+ self.set_maas_url()
+ self.set_api_credentials()
+ self.set_node_group_uuid()
=== modified file 'src/provisioningserver/tests/test_boot_images.py'
--- src/provisioningserver/tests/test_boot_images.py 2012-09-25 14:45:59 +0000
+++ src/provisioningserver/tests/test_boot_images.py 2012-10-04 09:12:40 +0000
@@ -15,14 +15,8 @@
import json
from apiclient.maas_client import MAASClient
-from apiclient.testing.credentials import make_api_credentials
-from maastesting.factory import factory
from mock import Mock
from provisioningserver import boot_images
-from provisioningserver.auth import (
- record_api_credentials,
- record_maas_url,
- )
from provisioningserver.pxe import tftppath
from provisioningserver.testing.boot_images import make_boot_image_params
from provisioningserver.testing.config import ConfigFixture
@@ -35,13 +29,6 @@
super(TestBootImagesTasks, self).setUp()
self.useFixture(ConfigFixture({'tftp': {'root': self.make_dir()}}))
- def set_maas_url(self):
- record_maas_url(
- 'http://127.0.0.1/%s' % factory.make_name('path'))
-
- def set_api_credentials(self):
- record_api_credentials(':'.join(make_api_credentials()))
-
def test_sends_boot_images_to_server(self):
self.set_maas_url()
self.set_api_credentials()
=== added file 'src/provisioningserver/tests/test_tags.py'
--- src/provisioningserver/tests/test_tags.py 1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tests/test_tags.py 2012-10-04 09:12:40 +0000
@@ -0,0 +1,179 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for tag updating."""
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = []
+
+from apiclient.maas_client import MAASClient
+import httplib
+from lxml import etree
+from maastesting.factory import factory
+from maastesting.fakemethod import (
+ FakeMethod,
+ MultiFakeMethod,
+ )
+from mock import MagicMock
+from provisioningserver.auth import (
+ get_recorded_nodegroup_uuid,
+ )
+from provisioningserver.testing.testcase import PservTestCase
+from provisioningserver import tags
+
+
+class FakeResponse:
+
+ def __init__(self, status_code, content):
+ self.status_code = status_code
+ self.content = content
+
+
+class TestTagUpdating(PservTestCase):
+
+ def test_get_cached_knowledge_knows_nothing(self):
+ # If we haven't given it any secrets, we should get back nothing
+ self.assertEqual((None, None), tags.get_cached_knowledge())
+
+ def test_get_cached_knowledge_with_only_url(self):
+ self.set_maas_url()
+ self.assertEqual((None, None), tags.get_cached_knowledge())
+
+ def test_get_cached_knowledge_with_only_url_creds(self):
+ self.set_maas_url()
+ self.set_api_credentials()
+ self.assertEqual((None, None), tags.get_cached_knowledge())
+
+ def test_get_cached_knowledge_with_all_info(self):
+ self.set_maas_url()
+ self.set_api_credentials()
+ self.set_node_group_uuid()
+ client, uuid = tags.get_cached_knowledge()
+ self.assertIsNot(None, client)
+ self.assertIsInstance(client, MAASClient)
+ self.assertIsNot(None, uuid)
+ self.assertEqual(get_recorded_nodegroup_uuid(), uuid)
+
+ def fake_client(self):
+ return MAASClient(None, None, self.make_maas_url())
+
+ def fake_cached_knowledge(self):
+ nodegroup_uuid = factory.make_name('nodegroupuuid')
+ return self.fake_client(), nodegroup_uuid
+
+ def test_get_nodes_calls_correct_api_and_parses_result(self):
+ client, uuid = self.fake_cached_knowledge()
+ response = FakeResponse(httplib.OK, '["system-id1", "system-id2"]')
+ mock = MagicMock(return_value=response)
+ self.patch(client, 'get', mock)
+ result = tags.get_nodes_for_node_group(client, uuid)
+ self.assertEqual(['system-id1', 'system-id2'], result)
+ url = 'api/1.0/nodegroup/%s/' % (uuid,)
+ mock.assert_called_once_with(url, op='list_nodes')
+
+ def test_get_hardware_details_calls_correct_api_and_parses_result(self):
+ client, uuid = self.fake_cached_knowledge()
+ xml_data = "<test><data /></test>"
+ content = '[["system-id1", "%s"]]' % (xml_data,)
+ response = FakeResponse(httplib.OK, content)
+ mock = MagicMock(return_value=response)
+ self.patch(client, 'get', mock)
+ result = tags.get_hardware_details_for_nodes(
+ client, uuid, ['system-id1', 'system-id2'])
+ self.assertEqual([['system-id1', xml_data]], result)
+ url = 'api/1.0/nodegroup/%s/' % (uuid,)
+ mock.assert_called_once_with(
+ url, op='node_hardware_details',
+ system_ids=["system-id1", "system-id2"])
+
+ def test_update_node_tags_calls_correct_api_and_parses_result(self):
+ client, uuid = self.fake_cached_knowledge()
+ content = '{"added": 1, "removed": 2}'
+ response = FakeResponse(httplib.OK, content)
+ mock = MagicMock(return_value=response)
+ self.patch(client, 'post', mock)
+ name = factory.make_name('tag')
+ result = tags.update_node_tags(client, name, uuid,
+ ['add-system-id'], ['remove-1', 'remove-2'])
+ self.assertEqual({'added': 1, 'removed': 2}, result)
+ url = 'api/1.0/tags/%s/' % (name,)
+ mock.assert_called_once_with(
+ url, op='update_nodes',
+ add=['add-system-id'], remove=['remove-1', 'remove-2'])
+
+ def test_process_batch_evaluates_xpath(self):
+ # Yay, something that doesn't need patching...
+ xpath = etree.XPath('//node')
+ node_details = [['a', '<node />'],
+ ['b', '<not-node />'],
+ ['c', '<parent><node /></parent>'],
+ ]
+ self.assertEqual(
+ (['a', 'c'], ['b']),
+ tags.process_batch(xpath, node_details))
+
+ def test_process_node_tags_no_secrets(self):
+ self.patch(MAASClient, 'get')
+ self.patch(MAASClient, 'post')
+ tag_name = factory.make_name('tag')
+ tags.process_node_tags(tag_name, '//node')
+ self.assertFalse(MAASClient.get.called)
+ self.assertFalse(MAASClient.post.called)
+
+ def test_process_node_tags_integration(self):
+ self.set_secrets()
+ get_nodes = FakeMethod(
+ result=FakeResponse(httplib.OK, '["system-id1", "system-id2"]'))
+ get_hw_details = FakeMethod(
+ result=FakeResponse(httplib.OK,
+ '[["system-id1", "<node />"], ["system-id2", "<no-node />"]]'))
+ get_fake = MultiFakeMethod([get_nodes, get_hw_details])
+ post_fake = FakeMethod(
+ result=FakeResponse(httplib.OK, '{"added": 1, "removed": 1}'))
+ self.patch(MAASClient, 'get', get_fake)
+ self.patch(MAASClient, 'post', post_fake)
+ tag_name = factory.make_name('tag')
+ nodegroup_uuid = get_recorded_nodegroup_uuid()
+ tags.process_node_tags(tag_name, '//node')
+ nodegroup_url = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
+ tag_url = 'api/1.0/tags/%s/' % (tag_name,)
+ self.assertEqual([((nodegroup_url,), {'op': 'list_nodes'})],
+ get_nodes.calls)
+ self.assertEqual([((nodegroup_url,),
+ {'op': 'node_hardware_details',
+ 'system_ids': ['system-id1', 'system-id2']})],
+ get_hw_details.calls)
+ self.assertEqual([((tag_url,),
+ {'op': 'update_nodes',
+ 'add': ['system-id1'],
+ 'remove': ['system-id2'],
+ })], post_fake.calls)
+
+ def test_process_node_tags_requests_details_in_batches(self):
+ client = object()
+ uuid = factory.make_name('nodegroupuuid')
+ self.patch(
+ tags, 'get_cached_knowledge',
+ MagicMock(return_value=(client, uuid)))
+ self.patch(
+ tags, 'get_nodes_for_node_group',
+ MagicMock(return_value=['a', 'b', 'c']))
+ fake_first = FakeMethod(
+ result=[['a', '<node />'], ['b', '<not-node />']])
+ fake_second = FakeMethod(
+ result=[['c', '<parent><node /></parent>']])
+ self.patch(tags, 'get_hardware_details_for_nodes',
+ MultiFakeMethod([fake_first, fake_second]))
+ self.patch(tags, 'update_node_tags')
+ tag_name = factory.make_name('tag')
+ tags.process_node_tags(tag_name, '//node', batch_size=2)
+ tags.get_cached_knowledge.assert_called_once_with()
+ tags.get_nodes_for_node_group.assert_called_once_with(client, uuid)
+ self.assertEqual([((client, uuid, ['a', 'b']), {})], fake_first.calls)
+ self.assertEqual([((client, uuid, ['c']), {})], fake_second.calls)