← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/maas/tag-updating into lp:maas

 

John A Meinel has proposed merging lp:~jameinel/maas/tag-updating into lp:maas.

Commit message:
Add a task for the provisioning_server that can update tags using the APIs we just added.

This allows us to farm out all the work for processing 100,000 tags into the provisioning_servers, which should allow us to consistently evaluate a Tag definition in under 10s, even as we scale.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~jameinel/maas/tag-updating/+merge/127962

Add a task for the provisioning_server that can update tags using the APIs we just added.

This uses a lot of mocking to make sure the calls are correct, but I'll still need to add testing on the other side that end-to-end actually works.

I can wait to land this until the next patch is complete if so desired, but I would like to get feedback on the mock work that was done here.


-- 
https://code.launchpad.net/~jameinel/maas/tag-updating/+merge/127962
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/maas/tag-updating into lp:maas.
=== modified file 'acceptance/Makefile'
--- acceptance/Makefile	2012-09-21 14:12:19 +0000
+++ acceptance/Makefile	2012-10-04 09:12:40 +0000
@@ -15,7 +15,7 @@
 #   $ make series=randy
 #
 
-include /etc/lsb-release
+DISTRIB_CODENAME=unstable
 
 # Default to the newer of Quantal or the local series.
 series := $(lastword $(sort quantal $(DISTRIB_CODENAME)))

=== added file 'src/provisioningserver/tags.py'
--- src/provisioningserver/tags.py	1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tags.py	2012-10-04 09:12:40 +0000
@@ -0,0 +1,146 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Celery jobs for managing tags.
+
+"""
+
+import json
+from lxml import etree
+
+from apiclient.maas_client import (
+    MAASClient,
+    MAASDispatcher,
+    MAASOAuth,
+    )
+
+from provisioningserver.auth import (
+    get_recorded_api_credentials,
+    get_recorded_maas_url,
+    get_recorded_nodegroup_uuid,
+    )
+
+from provisioningserver.logging import task_logger
+
+
+DEFAULT_BATCH_SIZE = 100
+
+
+def get_cached_knowledge():
+    """Get all the information that we need to know, or raise an error.
+
+    :return: (client, nodegroup_uuid)
+    """
+    maas_url = get_recorded_maas_url()
+    if maas_url is None:
+        task_logger.error("Not updating tags: don't have API URL yet.")
+        return None, None
+    api_credentials = get_recorded_api_credentials()
+    if api_credentials is None:
+        task_logger.error("Not updating tags: don't have API key yet.")
+        return None, None
+    nodegroup_uuid = get_recorded_nodegroup_uuid()
+    if nodegroup_uuid is None:
+        task_logger.error("Not updating tags: don't have UUID yet.")
+        return None, None
+    client = MAASClient(MAASOAuth(*api_credentials), MAASDispatcher(),
+        maas_url)
+    return client, nodegroup_uuid
+
+
+def get_nodes_for_node_group(client, nodegroup_uuid):
+    """Retrieve the UUIDs of nodes in a particular group.
+
+    :param client: MAAS client instance
+    :param nodegroup_uuid: Node group for which to retrieve nodes
+    :return: List of UUIDs for nodes in nodegroup
+    """
+    path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid)
+    response = client.get(path, op='list_nodes')
+    # XXX: Check the response code before we parse the content
+    return json.loads(response.content)
+
+
+def get_hardware_details_for_nodes(client, nodegroup_uuid, system_ids):
+    """Retrieve the lshw output for a set of nodes.
+
+    :param client: MAAS client
+    :param system_ids: List of UUIDs of systems for which to fetch lshw data
+    :return: Dictionary mapping node UUIDs to lshw output
+    """
+    path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
+    # TODO: Do we pass system_ids as a python list? Or do we json.dumps it
+    #       first?
+    response = client.get(
+        path, op='node_hardware_details', system_ids=system_ids)
+    # XXX: Check the response code before we parse the content
+    return json.loads(response.content)
+
+
+def update_node_tags(client, tag_name, uuid, added, removed):
+    """Update the nodes relevant for a particular tag.
+
+    :param client: MAAS client
+    :param tag_name: Name of tag
+    :param uuid: NodeGroup uuid of this worker. Needed for security
+        permissions. (The nodegroup worker is only allowed to touch nodes in
+        its nodegroup, otherwise you need to be a superuser.)
+    :param added: Set of nodes to add
+    :param removed: Set of nodes to remove
+    """
+    path = 'api/1.0/tags/%s/' % (tag_name,)
+    response = client.post(path, op='update_nodes', add=added, remove=removed)
+    # XXX: Check the response code before we parse the content
+    return json.loads(response.content)
+
+
+def process_batch(xpath, hardware_details):
+    """Get the details for one batch, and process whether they match or not.
+    """
+    # Fetch node XML in batches
+    matched_nodes = []
+    unmatched_nodes = []
+    for system_id, hw_xml in hardware_details:
+        xml = etree.XML(hw_xml)
+        if xpath(xml):
+            matched_nodes.append(system_id)
+        else:
+            unmatched_nodes.append(system_id)
+    return matched_nodes, unmatched_nodes
+
+
+def process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+                batch_size=None):
+    if batch_size is None:
+        batch_size = DEFAULT_BATCH_SIZE
+    for i in range(0, len(system_ids), batch_size):
+        selected_ids = system_ids[i:i + batch_size]
+        details = get_hardware_details_for_nodes(
+            client, nodegroup_uuid, selected_ids)
+        matched, unmatched = process_batch(xpath, details)
+        # JAM 2012-10-04 If we wanted, we could defer the update_nodes until
+        #       after everything was processed, but this lets us do incremental
+        #       improvements, and avoids POSTing 4MB of info at once.
+        update_node_tags(client, tag_name, nodegroup_uuid, matched, unmatched)
+
+
+def process_node_tags(tag_name, tag_definition, batch_size=None):
+    """Update the nodes for a new/changed tag definition.
+
+    :param tag_name: Name of the tag to update nodes for
+    :param tag_definition: Tag definition
+    :param batch_size: Size of batch
+    """
+    client, nodegroup_uuid = get_cached_knowledge()
+    if not all([client, nodegroup_uuid]):
+        task_logger.error('Unable to update tag: %s for definition %r'
+            ' please refresh secrets, then rebuild this tag'
+            % (tag_name, tag_definition))
+        return
+    # We evaluate this early, so we can fail before sending a bunch of data to
+    # the server
+    xpath = etree.XPath(tag_definition)
+    # Get nodes to process
+    system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
+    process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+                batch_size=batch_size)

=== modified file 'src/provisioningserver/tasks.py'
--- src/provisioningserver/tasks.py	2012-09-29 23:49:08 +0000
+++ src/provisioningserver/tasks.py	2012-10-04 09:12:40 +0000
@@ -30,7 +30,10 @@
 
 from celery.app import app_or_default
 from celery.task import task
-from provisioningserver import boot_images
+from provisioningserver import (
+    boot_images,
+    tags,
+    )
 from provisioningserver.auth import (
     record_api_credentials,
     record_maas_url,
@@ -335,3 +338,13 @@
 def report_boot_images():
     """For master worker only: report available netboot images."""
     boot_images.report_to_server()
+
+
+@task
+def update_node_tags(tag_name, tag_definition):
+    """Update the nodes for a new/changed tag definition.
+
+    :param tag_name: Name of the tag to update nodes for
+    :param tag_definition: Tag definition
+    """
+    tags.process_node_tags(tag_name, tag_definition)

=== modified file 'src/provisioningserver/testing/testcase.py'
--- src/provisioningserver/testing/testcase.py	2012-08-24 02:51:42 +0000
+++ src/provisioningserver/testing/testcase.py	2012-10-04 09:12:40 +0000
@@ -14,7 +14,14 @@
     'PservTestCase',
     ]
 
+from apiclient.testing.credentials import make_api_credentials
 from maastesting import testcase
+from maastesting.factory import factory
+from provisioningserver.auth import (
+    record_api_credentials,
+    record_maas_url,
+    record_nodegroup_uuid,
+    )
 from provisioningserver.testing.worker_cache import WorkerCacheFixture
 
 
@@ -23,3 +30,22 @@
     def setUp(self):
         super(PservTestCase, self).setUp()
         self.useFixture(WorkerCacheFixture())
+
+    def make_maas_url(self):
+        return 'http://127.0.0.1/%s' % factory.make_name('path')
+
+    def set_maas_url(self):
+        record_maas_url(self.make_maas_url())
+
+    def set_api_credentials(self):
+        record_api_credentials(':'.join(make_api_credentials()))
+
+    def set_node_group_uuid(self):
+        nodegroup_uuid = factory.make_name('nodegroupuuid')
+        record_nodegroup_uuid(nodegroup_uuid)
+
+    def set_secrets(self):
+        """Setup all secrets that we would get from refresh_secrets."""
+        self.set_maas_url()
+        self.set_api_credentials()
+        self.set_node_group_uuid()

=== modified file 'src/provisioningserver/tests/test_boot_images.py'
--- src/provisioningserver/tests/test_boot_images.py	2012-09-25 14:45:59 +0000
+++ src/provisioningserver/tests/test_boot_images.py	2012-10-04 09:12:40 +0000
@@ -15,14 +15,8 @@
 import json
 
 from apiclient.maas_client import MAASClient
-from apiclient.testing.credentials import make_api_credentials
-from maastesting.factory import factory
 from mock import Mock
 from provisioningserver import boot_images
-from provisioningserver.auth import (
-    record_api_credentials,
-    record_maas_url,
-    )
 from provisioningserver.pxe import tftppath
 from provisioningserver.testing.boot_images import make_boot_image_params
 from provisioningserver.testing.config import ConfigFixture
@@ -35,13 +29,6 @@
         super(TestBootImagesTasks, self).setUp()
         self.useFixture(ConfigFixture({'tftp': {'root': self.make_dir()}}))
 
-    def set_maas_url(self):
-        record_maas_url(
-            'http://127.0.0.1/%s' % factory.make_name('path'))
-
-    def set_api_credentials(self):
-        record_api_credentials(':'.join(make_api_credentials()))
-
     def test_sends_boot_images_to_server(self):
         self.set_maas_url()
         self.set_api_credentials()

=== added file 'src/provisioningserver/tests/test_tags.py'
--- src/provisioningserver/tests/test_tags.py	1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tests/test_tags.py	2012-10-04 09:12:40 +0000
@@ -0,0 +1,179 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for tag updating."""
+
+from __future__ import (
+    absolute_import,
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = []
+
+from apiclient.maas_client import MAASClient
+import httplib
+from lxml import etree
+from maastesting.factory import factory
+from maastesting.fakemethod import (
+    FakeMethod,
+    MultiFakeMethod,
+    )
+from mock import MagicMock
+from provisioningserver.auth import (
+    get_recorded_nodegroup_uuid,
+    )
+from provisioningserver.testing.testcase import PservTestCase
+from provisioningserver import tags
+
+
+class FakeResponse:
+
+    def __init__(self, status_code, content):
+        self.status_code = status_code
+        self.content = content
+
+
+class TestTagUpdating(PservTestCase):
+
+    def test_get_cached_knowledge_knows_nothing(self):
+        # If we haven't given it any secrets, we should get back nothing
+        self.assertEqual((None, None), tags.get_cached_knowledge())
+
+    def test_get_cached_knowledge_with_only_url(self):
+        self.set_maas_url()
+        self.assertEqual((None, None), tags.get_cached_knowledge())
+
+    def test_get_cached_knowledge_with_only_url_creds(self):
+        self.set_maas_url()
+        self.set_api_credentials()
+        self.assertEqual((None, None), tags.get_cached_knowledge())
+
+    def test_get_cached_knowledge_with_all_info(self):
+        self.set_maas_url()
+        self.set_api_credentials()
+        self.set_node_group_uuid()
+        client, uuid = tags.get_cached_knowledge()
+        self.assertIsNot(None, client)
+        self.assertIsInstance(client, MAASClient)
+        self.assertIsNot(None, uuid)
+        self.assertEqual(get_recorded_nodegroup_uuid(), uuid)
+
+    def fake_client(self):
+        return MAASClient(None, None, self.make_maas_url())
+
+    def fake_cached_knowledge(self):
+        nodegroup_uuid = factory.make_name('nodegroupuuid')
+        return self.fake_client(), nodegroup_uuid
+
+    def test_get_nodes_calls_correct_api_and_parses_result(self):
+        client, uuid = self.fake_cached_knowledge()
+        response = FakeResponse(httplib.OK, '["system-id1", "system-id2"]')
+        mock = MagicMock(return_value=response)
+        self.patch(client, 'get', mock)
+        result = tags.get_nodes_for_node_group(client, uuid)
+        self.assertEqual(['system-id1', 'system-id2'], result)
+        url = 'api/1.0/nodegroup/%s/' % (uuid,)
+        mock.assert_called_once_with(url, op='list_nodes')
+
+    def test_get_hardware_details_calls_correct_api_and_parses_result(self):
+        client, uuid = self.fake_cached_knowledge()
+        xml_data = "<test><data /></test>"
+        content = '[["system-id1", "%s"]]' % (xml_data,)
+        response = FakeResponse(httplib.OK, content)
+        mock = MagicMock(return_value=response)
+        self.patch(client, 'get', mock)
+        result = tags.get_hardware_details_for_nodes(
+            client, uuid, ['system-id1', 'system-id2'])
+        self.assertEqual([['system-id1', xml_data]], result)
+        url = 'api/1.0/nodegroup/%s/' % (uuid,)
+        mock.assert_called_once_with(
+            url, op='node_hardware_details',
+            system_ids=["system-id1", "system-id2"])
+
+    def test_update_node_tags_calls_correct_api_and_parses_result(self):
+        client, uuid = self.fake_cached_knowledge()
+        content = '{"added": 1, "removed": 2}'
+        response = FakeResponse(httplib.OK, content)
+        mock = MagicMock(return_value=response)
+        self.patch(client, 'post', mock)
+        name = factory.make_name('tag')
+        result = tags.update_node_tags(client, name, uuid,
+            ['add-system-id'], ['remove-1', 'remove-2'])
+        self.assertEqual({'added': 1, 'removed': 2}, result)
+        url = 'api/1.0/tags/%s/' % (name,)
+        mock.assert_called_once_with(
+            url, op='update_nodes',
+            add=['add-system-id'], remove=['remove-1', 'remove-2'])
+
+    def test_process_batch_evaluates_xpath(self):
+        # Yay, something that doesn't need patching...
+        xpath = etree.XPath('//node')
+        node_details = [['a', '<node />'],
+                        ['b', '<not-node />'],
+                        ['c', '<parent><node /></parent>'],
+                       ]
+        self.assertEqual(
+            (['a', 'c'], ['b']),
+            tags.process_batch(xpath, node_details))
+
+    def test_process_node_tags_no_secrets(self):
+        self.patch(MAASClient, 'get')
+        self.patch(MAASClient, 'post')
+        tag_name = factory.make_name('tag')
+        tags.process_node_tags(tag_name, '//node')
+        self.assertFalse(MAASClient.get.called)
+        self.assertFalse(MAASClient.post.called)
+
+    def test_process_node_tags_integration(self):
+        self.set_secrets()
+        get_nodes = FakeMethod(
+            result=FakeResponse(httplib.OK, '["system-id1", "system-id2"]'))
+        get_hw_details = FakeMethod(
+            result=FakeResponse(httplib.OK,
+                '[["system-id1", "<node />"], ["system-id2", "<no-node />"]]'))
+        get_fake = MultiFakeMethod([get_nodes, get_hw_details])
+        post_fake = FakeMethod(
+            result=FakeResponse(httplib.OK, '{"added": 1, "removed": 1}'))
+        self.patch(MAASClient, 'get', get_fake)
+        self.patch(MAASClient, 'post', post_fake)
+        tag_name = factory.make_name('tag')
+        nodegroup_uuid = get_recorded_nodegroup_uuid()
+        tags.process_node_tags(tag_name, '//node')
+        nodegroup_url = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
+        tag_url = 'api/1.0/tags/%s/' % (tag_name,)
+        self.assertEqual([((nodegroup_url,), {'op': 'list_nodes'})],
+                         get_nodes.calls)
+        self.assertEqual([((nodegroup_url,),
+                          {'op': 'node_hardware_details',
+                           'system_ids': ['system-id1', 'system-id2']})],
+                         get_hw_details.calls)
+        self.assertEqual([((tag_url,),
+                          {'op': 'update_nodes',
+                           'add': ['system-id1'],
+                           'remove': ['system-id2'],
+                          })], post_fake.calls)
+
+    def test_process_node_tags_requests_details_in_batches(self):
+        client = object()
+        uuid = factory.make_name('nodegroupuuid')
+        self.patch(
+            tags, 'get_cached_knowledge',
+            MagicMock(return_value=(client, uuid)))
+        self.patch(
+            tags, 'get_nodes_for_node_group',
+            MagicMock(return_value=['a', 'b', 'c']))
+        fake_first = FakeMethod(
+            result=[['a', '<node />'], ['b', '<not-node />']])
+        fake_second = FakeMethod(
+            result=[['c', '<parent><node /></parent>']])
+        self.patch(tags, 'get_hardware_details_for_nodes',
+            MultiFakeMethod([fake_first, fake_second]))
+        self.patch(tags, 'update_node_tags')
+        tag_name = factory.make_name('tag')
+        tags.process_node_tags(tag_name, '//node', batch_size=2)
+        tags.get_cached_knowledge.assert_called_once_with()
+        tags.get_nodes_for_node_group.assert_called_once_with(client, uuid)
+        self.assertEqual([((client, uuid, ['a', 'b']), {})], fake_first.calls)
+        self.assertEqual([((client, uuid, ['c']), {})], fake_second.calls)