← Back to team overview

cloud-init-dev team mailing list archive

[Merge] ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master

 

Scott Moser has proposed merging ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master.

Commit message:
Fix get_proc_env for pids that have non-utf8 content in environment.

There is no requirement that the environment of a process contains
only utf-8 data.  This modifies get_proc_env to support it reading
data as binary and decoding if provided with an encoding.

The default case is now that we now do:
 contents.decode('utf-8', 'replace')
rather than
 contents.decode('utf-8', 'strict')

LP: #1775371

Requested reviews:
  cloud-init commiters (cloud-init-dev)
Related bugs:
  Bug #1775371 in cloud-init: "cloud-init (18.2) fails on decoding proc1 env"
  https://bugs.launchpad.net/cloud-init/+bug/1775371

For more details, see:
https://code.launchpad.net/~smoser/cloud-init/+git/cloud-init/+merge/347698

see commit message
-- 
Your team cloud-init commiters is requested to review the proposed merge of ~smoser/cloud-init:fix/1775371-non-utf8-in-proc-env into cloud-init:master.
diff --git a/cloudinit/util.py b/cloudinit/util.py
index d9b61cf..441d5f7 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -2080,24 +2080,33 @@ def is_container():
     return False
 
 
-def get_proc_env(pid):
+def get_proc_env(pid, encoding='utf-8', errors='replace'):
     """
     Return the environment in a dict that a given process id was started with.
-    """
 
-    env = {}
-    fn = os.path.join("/proc/", str(pid), "environ")
+    @param encoding: if true, then decoding will be done with
+                     .decode(encoding, errors) and text will be returned.
+                     if false then binary will be returned.
+    @param errors:   only used if encoding is true."""
+    fn = os.path.join("/proc", str(pid), "environ")
+
     try:
-        contents = load_file(fn)
-        toks = contents.split("\x00")
-        for tok in toks:
-            if tok == "":
-                continue
-            (name, val) = tok.split("=", 1)
-            if name:
-                env[name] = val
+        contents = load_file(fn, decode=False)
     except (IOError, OSError):
-        pass
+        return {}
+
+    env = {}
+    null, equal = (b"\x00", b"=")
+    if encoding:
+        null, equal = ("\x00", "=")
+        contents = contents.decode(encoding, errors)
+
+    for tok in contents.split(null):
+        if not tok:
+            continue
+        (name, val) = tok.split(equal, 1)
+        if name:
+            env[name] = val
     return env
 
 
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index d774f3d..ea1c8a1 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -1081,4 +1081,60 @@ class TestLoadShellContent(helpers.TestCase):
                 ''])))
 
 
+class TestGetProcEnv(helpers.TestCase):
+    """test get_proc_env."""
+    null = b'\x00'
+    simple1 = b'HOME=/'
+    simple2 = b'PATH=/bin:/sbin'
+    bootflag = b'BOOTABLE_FLAG=\x80'  # from LP: #1775371
+    mixed = b'MIXED=' + b'ab\xccde'
+
+    def _enc(self, blob, encoding='utf-8', errors='replace'):
+        # return blob.encode(encoding, errors)
+        return blob.decode(encoding, errors)
+
+    @mock.patch("cloudinit.util.load_file")
+    def test_non_utf8_in_environment(self, m_load_file):
+        """env may have non utf-8 decodable content."""
+        content = self.null.join(
+            (self.bootflag, self.simple1, self.simple2, self.mixed))
+        m_load_file.return_value = content
+
+        self.assertEqual(
+            {'BOOTABLE_FLAG': self._enc(self.bootflag.split(b'=')[1]),
+             'HOME': '/', 'PATH': '/bin:/sbin',
+             'MIXED': self._enc(self.mixed.split(b'=')[1])},
+            util.get_proc_env(1))
+        self.assertEqual(1, m_load_file.call_count)
+
+    @mock.patch("cloudinit.util.load_file")
+    def test_encoding_none_returns_bytes(self, m_load_file):
+        """encoding none returns bytes."""
+        lines = (self.bootflag, self.simple1, self.simple2, self.mixed)
+        content = self.null.join(lines)
+        m_load_file.return_value = content
+
+        self.assertEqual(
+            dict([t.split(b'=') for t in lines]),
+            util.get_proc_env(1, encoding=None))
+        self.assertEqual(1, m_load_file.call_count)
+
+    @mock.patch("cloudinit.util.load_file")
+    def test_all_utf8_encoded(self, m_load_file):
+        """common path where only utf-8 decodable content."""
+        content = self.null.join((self.simple1, self.simple2))
+        m_load_file.return_value = content
+        self.assertEqual(
+            {'HOME': '/', 'PATH': '/bin:/sbin'},
+            util.get_proc_env(1))
+        self.assertEqual(1, m_load_file.call_count)
+
+    @mock.patch("cloudinit.util.load_file")
+    def test_non_existing_file_returns_empty_dict(self, m_load_file):
+        """as implemented, a non-existing pid returns empty dict.
+        This is how it was originally implemented."""
+        m_load_file.side_effect = OSError("File does not exist.")
+        self.assertEqual({}, util.get_proc_env(1))
+        self.assertEqual(1, m_load_file.call_count)
+
 # vi: ts=4 expandtab

Follow ups