cloud-init-dev team mailing list archive
-
cloud-init-dev team
-
Mailing list archive
-
Message #05424
[Merge] ~raharper/cloud-init:fix/threaded-reporter into cloud-init:master
Ryan Harper has proposed merging ~raharper/cloud-init:fix/threaded-reporter into cloud-init:master.
Commit message:
hyperv_reporting_handler: simplify threaded publisher
Switch the implementation to a daemon thread which uses a
blocking get from the Queue. No additional locking or flag checking
is needed since the Queue itself handles acquiring the lock as needed.
cloud-init only has a single producer (the main thread calling publish)
and the consumer will read all events in the queue and write them out.
Using the daemon mode of the thread handles flushing the queue on
main exit in python3; in python2.7 we handle the EOFError that results
when the publish thread calls to get() fails indicating the main thread
has exited.
The result is that the handler is no longer spawing a thread on each
publish event but rather creates a single thread when we start up
the reporter and we remove any additional use of separate locks and
flags as we only have a single Queue object and we're only calling
queue.put() from main thread and queue.get() from consuming thread.
Requested reviews:
cloud-init commiters (cloud-init-dev)
For more details, see:
https://code.launchpad.net/~raharper/cloud-init/+git/cloud-init/+merge/354073
--
Your team cloud-init commiters is requested to review the proposed merge of ~raharper/cloud-init:fix/threaded-reporter into cloud-init:master.
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4b4bb39..75c0d37 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -16,10 +16,8 @@ from cloudinit import (url_helper, util)
from datetime import datetime
if six.PY2:
- import multiprocessing.queues as queue
from multiprocessing.queues import JoinableQueue as JQueue
else:
- import queue
from queue import Queue as JQueue
LOG = logging.getLogger(__name__)
@@ -134,15 +132,16 @@ class HyperVKvpReportingHandler(ReportingHandler):
super(HyperVKvpReportingHandler, self).__init__()
self._kvp_file_path = kvp_file_path
self._event_types = event_types
- self.running = False
- self.queue_lock = threading.Lock()
- self.running_lock = threading.Lock()
self.q = JQueue()
self.kvp_file = None
self.incarnation_no = self._get_incarnation_no()
self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
self.incarnation_no)
self._current_offset = 0
+ self.publish_thread = threading.Thread(
+ target=self._publish_event_routine)
+ self.publish_thread.daemon = True
+ self.publish_thread.start()
def _get_incarnation_no(self):
"""
@@ -276,10 +275,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _publish_event_routine(self):
while True:
- event = None
try:
- # acquire the lock.
- event = self.q.get_nowait()
+ event = self.q.get(block=True)
need_append = True
try:
if not os.path.exists(self._kvp_file_path):
@@ -302,41 +299,27 @@ class HyperVKvpReportingHandler(ReportingHandler):
if int(match_groups[0]) < self.incarnation_no:
need_append = False
self._update_kvp_item(encoded_data)
- break
+ continue
if need_append:
self._append_kvp_item(encoded_data)
except IOError as e:
LOG.warning(
"failed posting event to kvp: %s e:%s",
event.as_string(), e)
- self.running = False
- break
finally:
self.q.task_done()
- except queue.Empty:
- with self.queue_lock:
- # double check the queue is empty
- if self.q.empty():
- self.running = False
- break
-
- def trigger_publish_event(self):
- if not self.running:
- with self.running_lock:
- if not self.running:
- self.running = True
- thread = threading.Thread(
- target=self._publish_event_routine)
- thread.start()
+
+ # when main process exits, q.get() will through EOFError
+ # indicating we should exit this thread.
+ except EOFError:
+ return
# since the saving to the kvp pool can be a time costing task
# if the kvp pool already contains a chunk of data,
# so defer it to another thread.
def publish_event(self, event):
if (not self._event_types or event.event_type in self._event_types):
- with self.queue_lock:
- self.q.put(event)
- self.trigger_publish_event()
+ self.q.put(event)
available_handlers = DictRegistry()
Follow ups