← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~jameinel/maas/tag-race-1065454 into lp:maas

 

John A Meinel has proposed merging lp:~jameinel/maas/tag-race-1065454 into lp:maas.

Commit message:
This fixes the race condition for bug #1065454 when tags get updated multiple times.

The worker processes post the tag definition that they were operating with, and the region controller checks that it is the current definition or returns CONFLICT. This is done in a way that doesn't create a lot of noise in the logs, just some INFO level messages. (If it is happening all the time, you'd want to know, but you don't need to perform an action every time it occurs.)


Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #1065454 in MAAS: "Changing tag definition quickly could result in inconsistent data"
  https://bugs.launchpad.net/maas/+bug/1065454

For more details, see:
https://code.launchpad.net/~jameinel/maas/tag-race-1065454/+merge/131234

You can test this by creating lots of nodes with hardware data (say 1000) and then running "maas-cli M tag update definition='/node/x'; maas-cli M tag update definition='/node/y'".

Essentially, the first change will fire off workers using that definition, and then the second will fire off with the second definition.

Under normal circumstances, the second one will finish second, and thus 'win'. However, this patch just makes sure of it, by having the worker post the tag definition it was operating on, and the region controller just returns a CONFLICT response (which the worker knows how to handle, rather than failing with an exception).


-- 
https://code.launchpad.net/~jameinel/maas/tag-race-1065454/+merge/131234
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/maas/tag-race-1065454 into lp:maas.
=== modified file 'src/maasserver/api.py'
--- src/maasserver/api.py	2012-10-19 13:55:51 +0000
+++ src/maasserver/api.py	2012-10-24 16:36:20 +0000
@@ -1552,6 +1552,10 @@
 
         :param add: system_ids of nodes to add to this tag.
         :param remove: system_ids of nodes to remove from this tag.
+        :param definition: (optional) If supplied, the definition will be
+            validated against the current definition of the tag. If the value
+            does not match, then the update will be dropped (assuming this was
+            just a case of a worker being out-of-date)
         :param nodegroup: A uuid of a nodegroup being processed. This value is
             optional. If not supplied, the requester must be a superuser. If
             supplied, then the requester must be the worker associated with
@@ -1567,6 +1571,12 @@
                     'Must be a superuser or supply a nodegroup')
             nodegroup = get_one(NodeGroup.objects.filter(uuid=uuid))
             check_nodegroup_access(request, nodegroup)
+        definition = request.data.get('definition', None)
+        if definition is not None and tag.definition != definition:
+            return HttpResponse(
+                "Definition supplied '%s' doesn't match current definition '%s'"
+                % (definition, tag.definition),
+                status=httplib.CONFLICT)
         nodes_to_add = self._get_nodes_for(request, 'add', nodegroup)
         tag.node_set.add(*nodes_to_add)
         nodes_to_remove = self._get_nodes_for(request, 'remove', nodegroup)

=== modified file 'src/maasserver/tests/test_api.py'
--- src/maasserver/tests/test_api.py	2012-10-19 13:33:31 +0000
+++ src/maasserver/tests/test_api.py	2012-10-24 16:36:20 +0000
@@ -2866,6 +2866,24 @@
         self.assertEqual({'added': 0, 'removed': 0}, parsed_result)
         self.assertItemsEqual([], tag.node_set.all())
 
+    def test_POST_update_nodes_ignores_incorrect_definition(self):
+        tag = factory.make_tag()
+        orig_def = tag.definition
+        nodegroup = factory.make_node_group()
+        node = factory.make_node(nodegroup=nodegroup)
+        client = make_worker_client(nodegroup)
+        tag.definition = '//new/node/definition'
+        tag.save()
+        response = client.post(self.get_tag_uri(tag),
+            {'op': 'update_nodes',
+             'add': [node.system_id],
+             'nodegroup': nodegroup.uuid,
+             'definition': orig_def,
+            })
+        self.assertEqual(httplib.CONFLICT, response.status_code)
+        self.assertItemsEqual([], tag.node_set.all())
+        self.assertItemsEqual([], node.tags.all())
+
     def test_POST_rebuild_rebuilds_node_mapping(self):
         tag = factory.make_tag(definition='/foo/bar')
         # Only one node matches the tag definition, rebuilding should notice

=== modified file 'src/provisioningserver/tags.py'
--- src/provisioningserver/tags.py	2012-10-19 15:08:52 +0000
+++ src/provisioningserver/tags.py	2012-10-24 16:36:20 +0000
@@ -19,6 +19,7 @@
 
 
 import httplib
+import urllib2
 
 from apiclient.maas_client import (
     MAASClient,
@@ -104,11 +105,13 @@
         path, op='node_hardware_details', as_json=True, system_ids=system_ids))
 
 
-def post_updated_nodes(client, tag_name, uuid, added, removed):
+def post_updated_nodes(client, tag_name, tag_definition, uuid, added, removed):
     """Update the nodes relevant for a particular tag.
 
     :param client: MAAS client
     :param tag_name: Name of tag
+    :param tag_definition: Definition of the tag, used to assure that the work
+        being done matches the current value.
     :param uuid: NodeGroup uuid of this worker. Needed for security
         permissions. (The nodegroup worker is only allowed to touch nodes in
         its nodegroup, otherwise you need to be a superuser.)
@@ -118,9 +121,19 @@
     path = '/api/1.0/tags/%s/' % (tag_name,)
     task_logger.debug('Updating nodes for %s %s, adding %s removing %s'
         % (tag_name, uuid, len(added), len(removed)))
-    return process_response(client.post(
-        path, op='update_nodes', as_json=True,
-        nodegroup=uuid, add=added, remove=removed))
+    try:
+        return process_response(client.post(
+            path, op='update_nodes', as_json=True, nodegroup=uuid,
+            definition=tag_definition, add=added, remove=removed))
+    except urllib2.HTTPError as e:
+        if e.code == httplib.CONFLICT:
+            if e.fp is not None:
+                msg = e.fp.read()
+            else:
+                msg = e.msg
+            task_logger.info('Got a CONFLICT while updating tag: %s', msg)
+            return {}
+        raise
 
 
 def process_batch(xpath, hardware_details):
@@ -147,7 +160,7 @@
     return matched_nodes, unmatched_nodes
 
 
-def process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+def process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
                 batch_size=None):
     if batch_size is None:
         batch_size = DEFAULT_BATCH_SIZE
@@ -171,7 +184,7 @@
     # This also allows us to track if a nodegroup has been processed in the DB,
     # without having to add another API call.
     post_updated_nodes(
-        client, tag_name, nodegroup_uuid, all_matched, all_unmatched)
+        client, tag_name, tag_definition, nodegroup_uuid, all_matched, all_unmatched)
 
 
 def process_node_tags(tag_name, tag_definition, batch_size=None):
@@ -192,5 +205,5 @@
     xpath = etree.XPath(tag_definition)
     # Get nodes to process
     system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
-    process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+    process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
                 batch_size=batch_size)

=== modified file 'src/provisioningserver/tests/test_tags.py'
--- src/provisioningserver/tests/test_tags.py	2012-10-10 09:14:11 +0000
+++ src/provisioningserver/tests/test_tags.py	2012-10-24 16:36:20 +0000
@@ -20,6 +20,7 @@
     FakeMethod,
     MultiFakeMethod,
     )
+import urllib2
 from mock import MagicMock
 from provisioningserver.auth import (
     get_recorded_nodegroup_uuid,
@@ -102,12 +103,38 @@
         post_mock = MagicMock(return_value=response)
         self.patch(client, 'post', post_mock)
         name = factory.make_name('tag')
-        result = tags.post_updated_nodes(client, name, uuid,
+        tag_definition = factory.make_name('//')
+        result = tags.post_updated_nodes(client, name, tag_definition, uuid,
             ['add-system-id'], ['remove-1', 'remove-2'])
         self.assertEqual({'added': 1, 'removed': 2}, result)
         url = '/api/1.0/tags/%s/' % (name,)
         post_mock.assert_called_once_with(
             url, op='update_nodes', as_json=True, nodegroup=uuid,
+            definition=tag_definition,
+            add=['add-system-id'], remove=['remove-1', 'remove-2'])
+
+    def test_post_updated_nodes_handles_conflict(self):
+        # If a worker started processing a node late, it might try to post
+        # an updated list with an out-of-date definition. It gets a CONFLICT in
+        # that case, which should be handled.
+        client, uuid = self.fake_cached_knowledge()
+        name = factory.make_name('tag')
+        right_tag_defintion = factory.make_name('//')
+        wrong_tag_definition = factory.make_name('//')
+        content = ("Definition supplied '%s' doesn't match"
+                   " current definition '%s'"
+                   % (wrong_tag_definition, right_tag_defintion))
+        err = urllib2.HTTPError('url', httplib.CONFLICT, content, {}, None)
+        post_mock = MagicMock(side_effect=err)
+        self.patch(client, 'post', post_mock)
+        result = tags.post_updated_nodes(client, name, wrong_tag_definition,
+            uuid, ['add-system-id'], ['remove-1', 'remove-2'])
+        # self.assertEqual({'added': 1, 'removed': 2}, result)
+        url = '/api/1.0/tags/%s/' % (name,)
+        self.assertEqual({}, result)
+        post_mock.assert_called_once_with(
+            url, op='update_nodes', as_json=True, nodegroup=uuid,
+            definition=wrong_tag_definition,
             add=['add-system-id'], remove=['remove-1', 'remove-2'])
 
     def test_process_batch_evaluates_xpath(self):
@@ -152,7 +179,8 @@
         self.patch(MAASClient, 'post', post_fake)
         tag_name = factory.make_name('tag')
         nodegroup_uuid = get_recorded_nodegroup_uuid()
-        tags.process_node_tags(tag_name, '//node')
+        tag_definition = '//node'
+        tags.process_node_tags(tag_name, tag_definition)
         nodegroup_url = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid,)
         tag_url = '/api/1.0/tags/%s/' % (tag_name,)
         self.assertEqual([((nodegroup_url,), {'op': 'list_nodes'})],
@@ -166,6 +194,7 @@
                           {'as_json': True,
                            'op': 'update_nodes',
                            'nodegroup': nodegroup_uuid,
+                           'definition': tag_definition,
                            'add': ['system-id1'],
                            'remove': ['system-id2'],
                           })], post_update_fake.calls)
@@ -187,10 +216,11 @@
             MultiFakeMethod([fake_first, fake_second]))
         self.patch(tags, 'post_updated_nodes')
         tag_name = factory.make_name('tag')
-        tags.process_node_tags(tag_name, '//node', batch_size=2)
+        tag_definition = '//node'
+        tags.process_node_tags(tag_name, tag_definition, batch_size=2)
         tags.get_cached_knowledge.assert_called_once_with()
         tags.get_nodes_for_node_group.assert_called_once_with(client, uuid)
         self.assertEqual([((client, uuid, ['a', 'b']), {})], fake_first.calls)
         self.assertEqual([((client, uuid, ['c']), {})], fake_second.calls)
         tags.post_updated_nodes.assert_called_once_with(
-            client, tag_name, uuid, ['a', 'c'], ['b'])
+            client, tag_name, tag_definition, uuid, ['a', 'c'], ['b'])