← Back to team overview

cloud-init-dev team mailing list archive

[Merge] ~chad.smith/cloud-init:feature/kvp-reporting into cloud-init:master

 

Chad Smith has proposed merging ~chad.smith/cloud-init:feature/kvp-reporting into cloud-init:master.

Commit message:
logging: Add logging config type hyperv for reporting via Azure KVP

Linux guests can provide information to Hyper-V hosts via KVP.
KVP allows the guests to provide any string key-value-pairs back to the
host's registry. On linux, kvp communication pools are presented as pool
files in /var/lib/hyperv/.kvp_pool_#.

The following reporting configuration can enable this kvp reporting in
addition to default logging if the pool files exist:

reporting:
    logging:
        type: log
    telemetry:
        type: hyperv



Requested reviews:
  cloud-init commiters (cloud-init-dev)

For more details, see:
https://code.launchpad.net/~chad.smith/cloud-init/+git/cloud-init/+merge/353739
-- 
Your team cloud-init commiters is requested to review the proposed merge of ~chad.smith/cloud-init:feature/kvp-reporting into cloud-init:master.
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
index 6d12c43..7ae98e1 100644
--- a/cloudinit/cloud.py
+++ b/cloudinit/cloud.py
@@ -47,7 +47,7 @@ class Cloud(object):
 
     @property
     def cfg(self):
-        # Ensure that not indirectly modified
+        # Ensure that cfg is not indirectly modified
         return copy.deepcopy(self._cfg)
 
     def run(self, name, functor, args, freq=None, clear_on_fail=False):
@@ -61,7 +61,7 @@ class Cloud(object):
             return None
         return fn
 
-    # The rest of thes are just useful proxies
+    # The rest of these are just useful proxies
     def get_userdata(self, apply_filter=True):
         return self.datasource.get_userdata(apply_filter)
 
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
index d6ba90f..c0edee1 100644
--- a/cloudinit/cmd/main.py
+++ b/cloudinit/cmd/main.py
@@ -315,7 +315,7 @@ def main_init(name, args):
                 existing = "trust"
 
         init.purge_cache()
-        # Delete the non-net file as well
+        # Delete the no-net file as well
         util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
 
     # Stage 5
@@ -339,7 +339,7 @@ def main_init(name, args):
                               " Likely bad things to come!"))
         if not args.force:
             init.apply_network_config(bring_up=not args.local)
-            LOG.debug("[%s] Exiting without datasource in local mode", mode)
+            LOG.debug("[%s] Exiting without datasource", mode)
             if mode == sources.DSMODE_LOCAL:
                 return (None, [])
             else:
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index ab0b077..fde054e 100755
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -157,7 +157,7 @@ class Distro(object):
                     distro)
         header = '\n'.join([
             "# Converted from network_config for distro %s" % distro,
-            "# Implmentation of _write_network_config is needed."
+            "# Implementation of _write_network_config is needed."
         ])
         ns = network_state.parse_net_config_data(netconfig)
         contents = eni.network_state_to_eni(
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index 1ed2b48..e047767 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
 
 
 def update_configuration(config):
-    """Update the instanciated_handler_registry.
+    """Update the instantiated_handler_registry.
 
     :param config:
         The dictionary containing changes to apply.  If a key is given
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4066076..4b4bb39 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -1,17 +1,34 @@
 # This file is part of cloud-init. See LICENSE file for license information.
 
 import abc
+import fcntl
 import json
 import six
+import os
+import re
+import struct
+import threading
+import time
 
 from cloudinit import log as logging
 from cloudinit.registry import DictRegistry
 from cloudinit import (url_helper, util)
+from datetime import datetime
 
+if six.PY2:
+    import multiprocessing.queues as queue
+    from multiprocessing.queues import JoinableQueue as JQueue
+else:
+    import queue
+    from queue import Queue as JQueue
 
 LOG = logging.getLogger(__name__)
 
 
+class ReportException(Exception):
+    pass
+
+
 @six.add_metaclass(abc.ABCMeta)
 class ReportingHandler(object):
     """Base class for report handlers.
@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
             LOG.warning("failed posting event: %s", event.as_string())
 
 
+class HyperVKvpReportingHandler(ReportingHandler):
+    """
+    Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
+    and can be used to obtain high level diagnostic information from the host.
+
+    To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
+    running. It reads the kvp_file when the host requests the guest to
+    enumerate the KVP's.
+
+    This reporter collates all events for a module (origin|name) in a single
+    json string in the dictionary.
+
+    For more information, see
+    https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
+    """
+    HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
+    HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
+    HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
+                          HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
+    EVENT_PREFIX = 'CLOUD_INIT'
+    MSG_KEY = 'msg'
+    RESULT_KEY = 'result'
+    DESC_IDX_KEY = 'msg_i'
+    JSON_SEPARATORS = (',', ':')
+    KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
+
+    def __init__(self,
+                 kvp_file_path=KVP_POOL_FILE_GUEST,
+                 event_types=None):
+        super(HyperVKvpReportingHandler, self).__init__()
+        self._kvp_file_path = kvp_file_path
+        self._event_types = event_types
+        self.running = False
+        self.queue_lock = threading.Lock()
+        self.running_lock = threading.Lock()
+        self.q = JQueue()
+        self.kvp_file = None
+        self.incarnation_no = self._get_incarnation_no()
+        self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
+                                                  self.incarnation_no)
+        self._current_offset = 0
+
+    def _get_incarnation_no(self):
+        """
+        use the time passed as the incarnation number.
+        the incarnation number is the number which are used to
+        distinguish the old data stored in kvp and the new data.
+        """
+        uptime_str = util.uptime()
+        try:
+            return int(time.time() - float(uptime_str))
+        except ValueError:
+            LOG.warning("uptime '%s' not in correct format.", uptime_str)
+            return 0
+
+    def _iterate_kvps(self, offset):
+        """iterate the kvp file from the current offset."""
+        try:
+            with open(self._kvp_file_path, 'rb+') as f:
+                self.kvp_file = f
+                fcntl.flock(f, fcntl.LOCK_EX)
+                f.seek(offset)
+                record_data = f.read(self.HV_KVP_RECORD_SIZE)
+                while len(record_data) == self.HV_KVP_RECORD_SIZE:
+                    self._current_offset += self.HV_KVP_RECORD_SIZE
+                    kvp_item = self._decode_kvp_item(record_data)
+                    yield kvp_item
+                    record_data = f.read(self.HV_KVP_RECORD_SIZE)
+                fcntl.flock(f, fcntl.LOCK_UN)
+        finally:
+            self.kvp_file = None
+
+    def _event_key(self, event):
+        """
+        the event key format is:
+        CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
+        """
+        return u"{0}|{1}|{2}".format(self.event_key_prefix,
+                                     event.event_type, event.name)
+
+    def _encode_kvp_item(self, key, value):
+        data = (struct.pack("%ds%ds" % (
+                self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
+                self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
+                key.encode('utf-8'), value.encode('utf-8')))
+        return data
+
+    def _decode_kvp_item(self, record_data):
+        record_data_len = len(record_data)
+        if record_data_len != self.HV_KVP_RECORD_SIZE:
+            raise ReportException(
+                "record_data len not correct {0} {1}."
+                .format(record_data_len, self.HV_KVP_RECORD_SIZE))
+        k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
+                                                             .strip('\x00'))
+        v = (
+            record_data[
+                self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
+                ].decode('utf-8').strip('\x00'))
+
+        return {'key': k, 'value': v}
+
+    def _update_kvp_item(self, record_data):
+        if self.kvp_file is None:
+            raise ReportException(
+                "kvp file '{0}' not opened."
+                .format(self._kvp_file_path))
+        self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
+        self.kvp_file.write(record_data)
+
+    def _append_kvp_item(self, record_data):
+        with open(self._kvp_file_path, 'rb+') as f:
+            fcntl.flock(f, fcntl.LOCK_EX)
+            # seek to end of the file
+            f.seek(0, 2)
+            f.write(record_data)
+            f.flush()
+            fcntl.flock(f, fcntl.LOCK_UN)
+            self._current_offset = f.tell()
+
+    def _break_down(self, key, meta_data, description):
+        del meta_data[self.MSG_KEY]
+        des_in_json = json.dumps(description)
+        des_in_json = des_in_json[1:(len(des_in_json) - 1)]
+        i = 0
+        result_array = []
+        message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
+        while True:
+            meta_data[self.DESC_IDX_KEY] = i
+            meta_data[self.MSG_KEY] = ''
+            data_without_desc = json.dumps(meta_data,
+                                           separators=self.JSON_SEPARATORS)
+            room_for_desc = (
+                self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
+                len(data_without_desc) - 8)
+            value = data_without_desc.replace(
+                message_place_holder,
+                '"{key}":"{desc}"'.format(
+                    key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
+            result_array.append(self._encode_kvp_item(key, value))
+            i += 1
+            des_in_json = des_in_json[room_for_desc:]
+            if len(des_in_json) == 0:
+                break
+        return result_array
+
+    def _encode_event(self, event):
+        """
+        encode the event into kvp data bytes.
+        if the event content reaches the maximum length of kvp value.
+        then it would be cut to multiple slices.
+        """
+        key = self._event_key(event)
+        meta_data = {
+                "name": event.name,
+                "type": event.event_type,
+                "ts": (datetime.utcfromtimestamp(event.timestamp)
+                       .isoformat() + 'Z'),
+                }
+        if hasattr(event, self.RESULT_KEY):
+            meta_data[self.RESULT_KEY] = event.result
+        meta_data[self.MSG_KEY] = event.description
+        value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
+        # if it reaches the maximum length of kvp value,
+        # break it down to slices.
+        # this should be very corner case.
+        if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
+            return self._break_down(key, meta_data, event.description)
+        else:
+            data = self._encode_kvp_item(key, value)
+            return [data]
+
+    def _publish_event_routine(self):
+        while True:
+            event = None
+            try:
+                # acquire the lock.
+                event = self.q.get_nowait()
+                need_append = True
+                try:
+                    if not os.path.exists(self._kvp_file_path):
+                        LOG.warning(
+                            "skip writing events %s to %s. file not present.",
+                            event.as_string(),
+                            self._kvp_file_path)
+                    encoded_event = self._encode_event(event)
+                    # for each encoded_event
+                    for encoded_data in (encoded_event):
+                        for kvp in self._iterate_kvps(self._current_offset):
+                            match = (
+                                re.match(
+                                    r"^{0}\|(\d+)\|.+"
+                                    .format(self.EVENT_PREFIX),
+                                    kvp['key']
+                                ))
+                            if match:
+                                match_groups = match.groups(0)
+                                if int(match_groups[0]) < self.incarnation_no:
+                                    need_append = False
+                                    self._update_kvp_item(encoded_data)
+                                    break
+                        if need_append:
+                            self._append_kvp_item(encoded_data)
+                except IOError as e:
+                    LOG.warning(
+                        "failed posting event to kvp: %s e:%s",
+                        event.as_string(), e)
+                    self.running = False
+                    break
+                finally:
+                    self.q.task_done()
+            except queue.Empty:
+                with self.queue_lock:
+                    # double check the queue is empty
+                    if self.q.empty():
+                        self.running = False
+                        break
+
+    def trigger_publish_event(self):
+        if not self.running:
+            with self.running_lock:
+                if not self.running:
+                    self.running = True
+                    thread = threading.Thread(
+                        target=self._publish_event_routine)
+                    thread.start()
+
+    # since the saving to the kvp pool can be a time costing task
+    # if the kvp pool already contains a chunk of data,
+    # so defer it to another thread.
+    def publish_event(self, event):
+        if (not self._event_types or event.event_type in self._event_types):
+            with self.queue_lock:
+                self.q.put(event)
+            self.trigger_publish_event()
+
+
 available_handlers = DictRegistry()
 available_handlers.register_item('log', LogHandler)
 available_handlers.register_item('print', PrintHandler)
 available_handlers.register_item('webhook', WebHookHandler)
+available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
 
 # vi: ts=4 expandtab
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index c132b57..8874d40 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -510,7 +510,7 @@ class Init(object):
                 # The default frequency if handlers don't have one
                 'frequency': frequency,
                 # This will be used when new handlers are found
-                # to help write there contents to files with numbered
+                # to help write their contents to files with numbered
                 # names...
                 'handlercount': 0,
                 'excluded': excluded,
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
new file mode 100644
index 0000000..2e64c6c
--- /dev/null
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -0,0 +1,134 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.reporting import events
+from cloudinit.reporting import handlers
+
+import json
+import os
+
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase
+
+
+class TestKvpEncoding(CiTestCase):
+    def test_encode_decode(self):
+        kvp = {'key': 'key1', 'value': 'value1'}
+        kvp_reporting = handlers.HyperVKvpReportingHandler()
+        data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
+        self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
+        decoded_kvp = kvp_reporting._decode_kvp_item(data)
+        self.assertEqual(kvp, decoded_kvp)
+
+
+class TextKvpReporter(CiTestCase):
+    def setUp(self):
+        super(TextKvpReporter, self).setUp()
+        self.tmp_file_path = self.tmp_path('kvp_pool_file')
+        util.ensure_file(self.tmp_file_path)
+
+    def test_event_type_can_be_filtered(self):
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path,
+            event_types=['foo', 'bar'])
+
+        reporter.publish_event(
+            events.ReportingEvent('foo', 'name', 'description'))
+        reporter.publish_event(
+            events.ReportingEvent('some_other', 'name', 'description3'))
+        reporter.q.join()
+
+        kvps = list(reporter._iterate_kvps(0))
+        self.assertEqual(1, len(kvps))
+
+        reporter.publish_event(
+            events.ReportingEvent('bar', 'name', 'description2'))
+        reporter.q.join()
+        kvps = list(reporter._iterate_kvps(0))
+        self.assertEqual(2, len(kvps))
+
+        self.assertIn('foo', kvps[0]['key'])
+        self.assertIn('bar', kvps[1]['key'])
+        self.assertNotIn('some_other', kvps[0]['key'])
+        self.assertNotIn('some_other', kvps[1]['key'])
+
+    def test_events_are_over_written(self):
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+
+        self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
+
+        reporter.publish_event(
+            events.ReportingEvent('foo', 'name1', 'description'))
+        reporter.publish_event(
+            events.ReportingEvent('foo', 'name2', 'description'))
+        reporter.q.join()
+        self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
+
+        reporter2 = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+        reporter2.incarnation_no = reporter.incarnation_no + 1
+        reporter2.publish_event(
+            events.ReportingEvent('foo', 'name3', 'description'))
+        reporter2.q.join()
+
+        self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
+
+    def test_events_with_higher_incarnation_not_over_written(self):
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+
+        self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
+
+        reporter.publish_event(
+            events.ReportingEvent('foo', 'name1', 'description'))
+        reporter.publish_event(
+            events.ReportingEvent('foo', 'name2', 'description'))
+        reporter.q.join()
+        self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
+
+        reporter3 = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+        reporter3.incarnation_no = reporter.incarnation_no - 1
+        reporter3.publish_event(
+            events.ReportingEvent('foo', 'name3', 'description'))
+        reporter3.q.join()
+        self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
+
+    def test_finish_event_result_is_logged(self):
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+        reporter.publish_event(
+            events.FinishReportingEvent('name2', 'description1',
+                                        result=events.status.FAIL))
+        reporter.q.join()
+        self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
+
+    def test_file_operation_issue(self):
+        os.remove(self.tmp_file_path)
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+        reporter.publish_event(
+            events.FinishReportingEvent('name2', 'description1',
+                                        result=events.status.FAIL))
+        reporter.q.join()
+
+    def test_event_very_long(self):
+        reporter = handlers.HyperVKvpReportingHandler(
+            kvp_file_path=self.tmp_file_path)
+        description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
+        long_event = events.FinishReportingEvent(
+            'event_name',
+            description,
+            result=events.status.FAIL)
+        reporter.publish_event(long_event)
+        reporter.q.join()
+        kvps = list(reporter._iterate_kvps(0))
+        self.assertEqual(3, len(kvps))
+
+        # restore from the kvp to see the content are all there
+        full_description = ''
+        for i in range(len(kvps)):
+            msg_slice = json.loads(kvps[i]['value'])
+            self.assertEqual(msg_slice['msg_i'], i)
+            full_description += msg_slice['msg']
+        self.assertEqual(description, full_description)

Follow ups