launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #13678
[Merge] lp:~jameinel/maas/tag-race-1065454 into lp:maas
John A Meinel has proposed merging lp:~jameinel/maas/tag-race-1065454 into lp:maas.
Commit message:
This fixes the race condition for bug #1065454 when tags get updated multiple times.
The worker processes post the tag definition that they were operating with, and the region controller checks that it is the current definition or returns CONFLICT. This is done in a way that doesn't create a lot of noise in the logs, just some INFO level messages. (If it is happening all the time, you'd want to know, but you don't need to perform an action every time it occurs.)
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #1065454 in MAAS: "Changing tag definition quickly could result in inconsistent data"
https://bugs.launchpad.net/maas/+bug/1065454
For more details, see:
https://code.launchpad.net/~jameinel/maas/tag-race-1065454/+merge/131234
You can test this by creating lots of nodes with hardware data (say 1000) and then running "maas-cli M tag update definition='/node/x'; maas-cli M tag update definition='/node/y'".
Essentially, the first change will fire off workers using that definition, and then the second will fire off with the second definition.
Under normal circumstances, the second one will finish second, and thus 'win'. However, this patch just makes sure of it, by having the worker post the tag definition it was operating on, and the region controller just returns a CONFLICT response (which the worker knows how to handle, rather than failing with an exception).
--
https://code.launchpad.net/~jameinel/maas/tag-race-1065454/+merge/131234
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~jameinel/maas/tag-race-1065454 into lp:maas.
=== modified file 'src/maasserver/api.py'
--- src/maasserver/api.py 2012-10-19 13:55:51 +0000
+++ src/maasserver/api.py 2012-10-24 16:36:20 +0000
@@ -1552,6 +1552,10 @@
:param add: system_ids of nodes to add to this tag.
:param remove: system_ids of nodes to remove from this tag.
+ :param definition: (optional) If supplied, the definition will be
+ validated against the current definition of the tag. If the value
+ does not match, then the update will be dropped (assuming this was
+ just a case of a worker being out-of-date)
:param nodegroup: A uuid of a nodegroup being processed. This value is
optional. If not supplied, the requester must be a superuser. If
supplied, then the requester must be the worker associated with
@@ -1567,6 +1571,12 @@
'Must be a superuser or supply a nodegroup')
nodegroup = get_one(NodeGroup.objects.filter(uuid=uuid))
check_nodegroup_access(request, nodegroup)
+ definition = request.data.get('definition', None)
+ if definition is not None and tag.definition != definition:
+ return HttpResponse(
+ "Definition supplied '%s' doesn't match current definition '%s'"
+ % (definition, tag.definition),
+ status=httplib.CONFLICT)
nodes_to_add = self._get_nodes_for(request, 'add', nodegroup)
tag.node_set.add(*nodes_to_add)
nodes_to_remove = self._get_nodes_for(request, 'remove', nodegroup)
=== modified file 'src/maasserver/tests/test_api.py'
--- src/maasserver/tests/test_api.py 2012-10-19 13:33:31 +0000
+++ src/maasserver/tests/test_api.py 2012-10-24 16:36:20 +0000
@@ -2866,6 +2866,24 @@
self.assertEqual({'added': 0, 'removed': 0}, parsed_result)
self.assertItemsEqual([], tag.node_set.all())
+ def test_POST_update_nodes_ignores_incorrect_definition(self):
+ tag = factory.make_tag()
+ orig_def = tag.definition
+ nodegroup = factory.make_node_group()
+ node = factory.make_node(nodegroup=nodegroup)
+ client = make_worker_client(nodegroup)
+ tag.definition = '//new/node/definition'
+ tag.save()
+ response = client.post(self.get_tag_uri(tag),
+ {'op': 'update_nodes',
+ 'add': [node.system_id],
+ 'nodegroup': nodegroup.uuid,
+ 'definition': orig_def,
+ })
+ self.assertEqual(httplib.CONFLICT, response.status_code)
+ self.assertItemsEqual([], tag.node_set.all())
+ self.assertItemsEqual([], node.tags.all())
+
def test_POST_rebuild_rebuilds_node_mapping(self):
tag = factory.make_tag(definition='/foo/bar')
# Only one node matches the tag definition, rebuilding should notice
=== modified file 'src/provisioningserver/tags.py'
--- src/provisioningserver/tags.py 2012-10-19 15:08:52 +0000
+++ src/provisioningserver/tags.py 2012-10-24 16:36:20 +0000
@@ -19,6 +19,7 @@
import httplib
+import urllib2
from apiclient.maas_client import (
MAASClient,
@@ -104,11 +105,13 @@
path, op='node_hardware_details', as_json=True, system_ids=system_ids))
-def post_updated_nodes(client, tag_name, uuid, added, removed):
+def post_updated_nodes(client, tag_name, tag_definition, uuid, added, removed):
"""Update the nodes relevant for a particular tag.
:param client: MAAS client
:param tag_name: Name of tag
+ :param tag_definition: Definition of the tag, used to assure that the work
+ being done matches the current value.
:param uuid: NodeGroup uuid of this worker. Needed for security
permissions. (The nodegroup worker is only allowed to touch nodes in
its nodegroup, otherwise you need to be a superuser.)
@@ -118,9 +121,19 @@
path = '/api/1.0/tags/%s/' % (tag_name,)
task_logger.debug('Updating nodes for %s %s, adding %s removing %s'
% (tag_name, uuid, len(added), len(removed)))
- return process_response(client.post(
- path, op='update_nodes', as_json=True,
- nodegroup=uuid, add=added, remove=removed))
+ try:
+ return process_response(client.post(
+ path, op='update_nodes', as_json=True, nodegroup=uuid,
+ definition=tag_definition, add=added, remove=removed))
+ except urllib2.HTTPError as e:
+ if e.code == httplib.CONFLICT:
+ if e.fp is not None:
+ msg = e.fp.read()
+ else:
+ msg = e.msg
+ task_logger.info('Got a CONFLICT while updating tag: %s', msg)
+ return {}
+ raise
def process_batch(xpath, hardware_details):
@@ -147,7 +160,7 @@
return matched_nodes, unmatched_nodes
-def process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+def process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
batch_size=None):
if batch_size is None:
batch_size = DEFAULT_BATCH_SIZE
@@ -171,7 +184,7 @@
# This also allows us to track if a nodegroup has been processed in the DB,
# without having to add another API call.
post_updated_nodes(
- client, tag_name, nodegroup_uuid, all_matched, all_unmatched)
+ client, tag_name, tag_definition, nodegroup_uuid, all_matched, all_unmatched)
def process_node_tags(tag_name, tag_definition, batch_size=None):
@@ -192,5 +205,5 @@
xpath = etree.XPath(tag_definition)
# Get nodes to process
system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
- process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
+ process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
batch_size=batch_size)
=== modified file 'src/provisioningserver/tests/test_tags.py'
--- src/provisioningserver/tests/test_tags.py 2012-10-10 09:14:11 +0000
+++ src/provisioningserver/tests/test_tags.py 2012-10-24 16:36:20 +0000
@@ -20,6 +20,7 @@
FakeMethod,
MultiFakeMethod,
)
+import urllib2
from mock import MagicMock
from provisioningserver.auth import (
get_recorded_nodegroup_uuid,
@@ -102,12 +103,38 @@
post_mock = MagicMock(return_value=response)
self.patch(client, 'post', post_mock)
name = factory.make_name('tag')
- result = tags.post_updated_nodes(client, name, uuid,
+ tag_definition = factory.make_name('//')
+ result = tags.post_updated_nodes(client, name, tag_definition, uuid,
['add-system-id'], ['remove-1', 'remove-2'])
self.assertEqual({'added': 1, 'removed': 2}, result)
url = '/api/1.0/tags/%s/' % (name,)
post_mock.assert_called_once_with(
url, op='update_nodes', as_json=True, nodegroup=uuid,
+ definition=tag_definition,
+ add=['add-system-id'], remove=['remove-1', 'remove-2'])
+
+ def test_post_updated_nodes_handles_conflict(self):
+ # If a worker started processing a node late, it might try to post
+ # an updated list with an out-of-date definition. It gets a CONFLICT in
+ # that case, which should be handled.
+ client, uuid = self.fake_cached_knowledge()
+ name = factory.make_name('tag')
+ right_tag_defintion = factory.make_name('//')
+ wrong_tag_definition = factory.make_name('//')
+ content = ("Definition supplied '%s' doesn't match"
+ " current definition '%s'"
+ % (wrong_tag_definition, right_tag_defintion))
+ err = urllib2.HTTPError('url', httplib.CONFLICT, content, {}, None)
+ post_mock = MagicMock(side_effect=err)
+ self.patch(client, 'post', post_mock)
+ result = tags.post_updated_nodes(client, name, wrong_tag_definition,
+ uuid, ['add-system-id'], ['remove-1', 'remove-2'])
+ # self.assertEqual({'added': 1, 'removed': 2}, result)
+ url = '/api/1.0/tags/%s/' % (name,)
+ self.assertEqual({}, result)
+ post_mock.assert_called_once_with(
+ url, op='update_nodes', as_json=True, nodegroup=uuid,
+ definition=wrong_tag_definition,
add=['add-system-id'], remove=['remove-1', 'remove-2'])
def test_process_batch_evaluates_xpath(self):
@@ -152,7 +179,8 @@
self.patch(MAASClient, 'post', post_fake)
tag_name = factory.make_name('tag')
nodegroup_uuid = get_recorded_nodegroup_uuid()
- tags.process_node_tags(tag_name, '//node')
+ tag_definition = '//node'
+ tags.process_node_tags(tag_name, tag_definition)
nodegroup_url = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid,)
tag_url = '/api/1.0/tags/%s/' % (tag_name,)
self.assertEqual([((nodegroup_url,), {'op': 'list_nodes'})],
@@ -166,6 +194,7 @@
{'as_json': True,
'op': 'update_nodes',
'nodegroup': nodegroup_uuid,
+ 'definition': tag_definition,
'add': ['system-id1'],
'remove': ['system-id2'],
})], post_update_fake.calls)
@@ -187,10 +216,11 @@
MultiFakeMethod([fake_first, fake_second]))
self.patch(tags, 'post_updated_nodes')
tag_name = factory.make_name('tag')
- tags.process_node_tags(tag_name, '//node', batch_size=2)
+ tag_definition = '//node'
+ tags.process_node_tags(tag_name, tag_definition, batch_size=2)
tags.get_cached_knowledge.assert_called_once_with()
tags.get_nodes_for_node_group.assert_called_once_with(client, uuid)
self.assertEqual([((client, uuid, ['a', 'b']), {})], fake_first.calls)
self.assertEqual([((client, uuid, ['c']), {})], fake_second.calls)
tags.post_updated_nodes.assert_called_once_with(
- client, tag_name, uuid, ['a', 'c'], ['b'])
+ client, tag_name, tag_definition, uuid, ['a', 'c'], ['b'])