← Back to team overview

cloud-init-dev team mailing list archive

[Merge] lp:~harlowja/cloud-init/mime-gzip-test into lp:cloud-init

 

Joshua Harlow has proposed merging lp:~harlowja/cloud-init/mime-gzip-test into lp:cloud-init.

Requested reviews:
  cloud init development team (cloud-init-dev)

For more details, see:
https://code.launchpad.net/~harlowja/cloud-init/mime-gzip-test/+merge/176614

Test for mime gzipped segments to ensure decompression is occuring.
-- 
https://code.launchpad.net/~harlowja/cloud-init/mime-gzip-test/+merge/176614
Your team cloud init development team is requested to review the proposed merge of lp:~harlowja/cloud-init/mime-gzip-test into lp:cloud-init.
=== modified file 'tests/unittests/test_userdata.py'
--- tests/unittests/test_userdata.py	2013-05-10 05:34:31 +0000
+++ tests/unittests/test_userdata.py	2013-07-24 07:36:31 +0000
@@ -2,10 +2,14 @@
 
 import StringIO
 
+import gzip
 import logging
+import mocker
 import os
 
 from email.mime.base import MIMEBase
+from email.mime.multipart import MIMEMultipart
+from email.mime.application import MIMEApplication
 
 from cloudinit import handlers
 from cloudinit import helpers as c_helpers
@@ -118,7 +122,7 @@
         ci.datasource = FakeDataSource(data)
 
         mock_write = self.mocker.replace("cloudinit.util.write_file",
-                                              passthrough=False)
+                                         passthrough=False)
         mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
         self.mocker.replay()
 
@@ -129,6 +133,46 @@
             "Unhandled non-multipart (text/x-not-multipart) userdata:",
             log_file.getvalue())
 
+    def test_mime_gzip_compressed(self):
+        """Tests that individual message gzip encoding works."""
+
+        def gzip_part(text):
+            contents = StringIO.StringIO()
+            f = gzip.GzipFile(fileobj=contents, mode='w')
+            f.write(str(text))
+            f.flush()
+            f.close()
+            return MIMEApplication(contents.getvalue(), 'gzip')
+
+        base_content1 = '''
+#cloud-config
+a: 2
+'''
+
+        base_content2 = '''
+#cloud-config
+b: 3
+c: 4
+'''
+
+        message = MIMEMultipart('test')
+        message.attach(gzip_part(base_content1))
+        message.attach(gzip_part(base_content2))
+        ci = stages.Init()
+        ci.datasource = FakeDataSource(str(message))
+        new_root = self.makeDir()
+        self.patchUtils(new_root)
+        self.patchOS(new_root)
+        ci.fetch()
+        ci.consume_userdata()
+        contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+        contents = util.load_yaml(contents)
+        self.assertTrue(isinstance(contents, dict))
+        self.assertEquals(3, len(contents))
+        self.assertEquals(2, contents['a'])
+        self.assertEquals(3, contents['b'])
+        self.assertEquals(4, contents['c'])
+
     def test_mime_text_plain(self):
         """Mime message of type text/plain is ignored but shows warning."""
         ci = stages.Init()
@@ -137,7 +181,7 @@
         ci.datasource = FakeDataSource(message.as_string())
 
         mock_write = self.mocker.replace("cloudinit.util.write_file",
-                                              passthrough=False)
+                                         passthrough=False)
         mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
         self.mocker.replay()
 
@@ -156,7 +200,7 @@
 
         outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
         mock_write = self.mocker.replace("cloudinit.util.write_file",
-                                              passthrough=False)
+                                         passthrough=False)
         mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
         mock_write(outpath, script, 0700)
         self.mocker.replay()
@@ -176,7 +220,7 @@
 
         outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
         mock_write = self.mocker.replace("cloudinit.util.write_file",
-                                              passthrough=False)
+                                         passthrough=False)
         mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
         mock_write(outpath, script, 0700)
         self.mocker.replay()


Follow ups