cloud-init-dev team mailing list archive
-
cloud-init-dev team
-
Mailing list archive
-
Message #04980
[Merge] ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master
Scott Moser has proposed merging ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master.
Commit message:
Fix get_proc_env for pids that have non-utf8 content in environment.
There is no requirement that the environment of a process contains
only utf-8 data. This modifies get_proc_env to support it reading
data as binary and decoding if provided with an encoding.
The default case is now that we now do:
contents.decode('utf-8', 'replace')
rather than
contents.decode('utf-8', 'strict')
LP: #1775371
Requested reviews:
cloud-init commiters (cloud-init-dev)
Related bugs:
Bug #1775371 in cloud-init: "cloud-init (18.2) fails on decoding proc1 env"
https://bugs.launchpad.net/cloud-init/+bug/1775371
For more details, see:
https://code.launchpad.net/~smoser/cloud-init/+git/cloud-init/+merge/347698
see commit message
--
Your team cloud-init commiters is requested to review the proposed merge of ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master.
diff --git a/cloudinit/util.py b/cloudinit/util.py
index d9b61cf..441d5f7 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -2080,24 +2080,33 @@ def is_container():
return False
-def get_proc_env(pid):
+def get_proc_env(pid, encoding='utf-8', errors='replace'):
"""
Return the environment in a dict that a given process id was started with.
- """
- env = {}
- fn = os.path.join("/proc/", str(pid), "environ")
+ @param encoding: if true, then decoding will be done with
+ .decode(encoding, errors) and text will be returned.
+ if false then binary will be returned.
+ @param errors: only used if encoding is true."""
+ fn = os.path.join("/proc", str(pid), "environ")
+
try:
- contents = load_file(fn)
- toks = contents.split("\x00")
- for tok in toks:
- if tok == "":
- continue
- (name, val) = tok.split("=", 1)
- if name:
- env[name] = val
+ contents = load_file(fn, decode=False)
except (IOError, OSError):
- pass
+ return {}
+
+ env = {}
+ null, equal = (b"\x00", b"=")
+ if encoding:
+ null, equal = ("\x00", "=")
+ contents = contents.decode(encoding, errors)
+
+ for tok in contents.split(null):
+ if not tok:
+ continue
+ (name, val) = tok.split(equal, 1)
+ if name:
+ env[name] = val
return env
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index d774f3d..ea1c8a1 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1081,4 +1081,60 @@ class TestLoadShellContent(helpers.TestCase):
''])))
+class TestGetProcEnv(helpers.TestCase):
+ """test get_proc_env."""
+ null = b'\x00'
+ simple1 = b'HOME=/'
+ simple2 = b'PATH=/bin:/sbin'
+ bootflag = b'BOOTABLE_FLAG=\x80' # from LP: #1775371
+ mixed = b'MIXED=' + b'ab\xccde'
+
+ def _enc(self, blob, encoding='utf-8', errors='replace'):
+ # return blob.encode(encoding, errors)
+ return blob.decode(encoding, errors)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_non_utf8_in_environment(self, m_load_file):
+ """env may have non utf-8 decodable content."""
+ content = self.null.join(
+ (self.bootflag, self.simple1, self.simple2, self.mixed))
+ m_load_file.return_value = content
+
+ self.assertEqual(
+ {'BOOTABLE_FLAG': self._enc(self.bootflag.split(b'=')[1]),
+ 'HOME': '/', 'PATH': '/bin:/sbin',
+ 'MIXED': self._enc(self.mixed.split(b'=')[1])},
+ util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_encoding_none_returns_bytes(self, m_load_file):
+ """encoding none returns bytes."""
+ lines = (self.bootflag, self.simple1, self.simple2, self.mixed)
+ content = self.null.join(lines)
+ m_load_file.return_value = content
+
+ self.assertEqual(
+ dict([t.split(b'=') for t in lines]),
+ util.get_proc_env(1, encoding=None))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_all_utf8_encoded(self, m_load_file):
+ """common path where only utf-8 decodable content."""
+ content = self.null.join((self.simple1, self.simple2))
+ m_load_file.return_value = content
+ self.assertEqual(
+ {'HOME': '/', 'PATH': '/bin:/sbin'},
+ util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
+ @mock.patch("cloudinit.util.load_file")
+ def test_non_existing_file_returns_empty_dict(self, m_load_file):
+ """as implemented, a non-existing pid returns empty dict.
+ This is how it was originally implemented."""
+ m_load_file.side_effect = OSError("File does not exist.")
+ self.assertEqual({}, util.get_proc_env(1))
+ self.assertEqual(1, m_load_file.call_count)
+
# vi: ts=4 expandtab
Follow ups