← Back to team overview

curtin-dev team mailing list archive

[Merge] ~raharper/curtin:fix/unittest-prevent-subp-by-default into curtin:master

 

Ryan Harper has proposed merging ~raharper/curtin:fix/unittest-prevent-subp-by-default into curtin:master.

Commit message:
unittest: do not allow util.subp by default

Bring over our util.subp mock from cloud-init.  Rework unittest to enable subp as needed.

Requested reviews:
  curtin developers (curtin-dev)

For more details, see:
https://code.launchpad.net/~raharper/curtin/+git/curtin/+merge/382604
-- 
Your team curtin developers is requested to review the proposed merge of ~raharper/curtin:fix/unittest-prevent-subp-by-default into curtin:master.
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 9514745..cc9b812 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -1,8 +1,9 @@
 # This file is part of curtin. See LICENSE file for copyright and license info.
 
-import contextlib
 import imp
 import importlib
+import io
+import logging
 import mock
 import os
 import random
@@ -11,8 +12,15 @@ import string
 import tempfile
 from unittest import TestCase, skipIf
 
+try:
+    from contextlib import contextmanager
+except ImportError:
+    from contextlib2 import contextmanager
+
 from curtin import util
 
+_real_subp = util.subp
+
 
 def builtin_module_name():
     options = ('builtins', '__builtin__')
@@ -27,7 +35,7 @@ def builtin_module_name():
             return name
 
 
-@contextlib.contextmanager
+@contextmanager
 def simple_mocked_open(content=None):
     if not content:
         content = ''
@@ -54,6 +62,68 @@ def skipUnlessJsonSchema():
 class CiTestCase(TestCase):
     """Common testing class which all curtin unit tests subclass."""
 
+    with_logs = False
+    allowed_subp = False
+    SUBP_SHELL_TRUE = "shell=True"
+
+    @contextmanager
+    def allow_subp(self, allowed_subp):
+        orig = self.allowed_subp
+        try:
+            self.allowed_subp = allowed_subp
+            yield
+        finally:
+            self.allowed_subp = orig
+
+    def setUp(self):
+        super(CiTestCase, self).setUp()
+        if self.with_logs:
+            # Create a log handler so unit tests can search expected logs.
+            self.logger = logging.getLogger()
+            self.logs = io.StringIO()
+            formatter = logging.Formatter('%(levelname)s: %(message)s')
+            handler = logging.StreamHandler(self.logs)
+            handler.setFormatter(formatter)
+            self.old_handlers = self.logger.handlers
+            self.logger.handlers = [handler]
+        if self.allowed_subp is True:
+            util.subp = _real_subp
+        else:
+            util.subp = self._fake_subp
+
+    def _fake_subp(self, *args, **kwargs):
+        if 'args' in kwargs:
+            cmd = kwargs['args']
+        else:
+            cmd = args[0]
+
+        if not isinstance(cmd, str):
+            cmd = cmd[0]
+        pass_through = False
+        if not isinstance(self.allowed_subp, (list, bool)):
+            raise TypeError("self.allowed_subp supports list or bool.")
+        if isinstance(self.allowed_subp, bool):
+            pass_through = self.allowed_subp
+        else:
+            pass_through = (
+                (cmd in self.allowed_subp) or
+                (self.SUBP_SHELL_TRUE in self.allowed_subp and
+                 kwargs.get('shell')))
+        if pass_through:
+            return _real_subp(*args, **kwargs)
+        raise Exception(
+            "called subp. set self.allowed_subp=True to allow\n subp(%s)" %
+            ', '.join([str(repr(a)) for a in args] +
+                      ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()]))
+
+    def tearDown(self):
+        if self.with_logs:
+            # Remove the handler we setup
+            logging.getLogger().handlers = self.old_handlers
+            logging.getLogger().level = None
+        util.subp = _real_subp
+        super(CiTestCase, self).tearDown()
+
     def add_patch(self, target, attr, **kwargs):
         """Patches specified target object and sets it as attr on test
         instance also schedules cleanup"""
diff --git a/tests/unittests/test_apt_custom_sources_list.py b/tests/unittests/test_apt_custom_sources_list.py
index fb6eb0c..bf004b1 100644
--- a/tests/unittests/test_apt_custom_sources_list.py
+++ b/tests/unittests/test_apt_custom_sources_list.py
@@ -93,7 +93,9 @@ class TestAptSourceConfigSourceList(CiTestCase):
     def setUp(self):
         super(TestAptSourceConfigSourceList, self).setUp()
         self.new_root = self.tmp_dir()
+        self.add_patch('curtin.util.subp', 'm_subp')
         # self.patchUtils(self.new_root)
+        self.m_subp.return_value = ("amd64", "")
 
     def _apt_source_list(self, cfg, expected):
         "_apt_source_list - Test rendering from template (generic)"
diff --git a/tests/unittests/test_apt_source.py b/tests/unittests/test_apt_source.py
index 353cdf8..6ae5579 100644
--- a/tests/unittests/test_apt_source.py
+++ b/tests/unittests/test_apt_source.py
@@ -75,6 +75,8 @@ class TestAptSourceConfig(CiTestCase):
         self.aptlistfile3 = os.path.join(self.tmp, "single-deb3.list")
         self.join = os.path.join
         self.matcher = re.compile(ADD_APT_REPO_MATCH).search
+        self.add_patch('curtin.util.subp', 'm_subp')
+        self.m_subp.return_value = ('s390x', '')
 
     @staticmethod
     def _add_apt_sources(*args, **kwargs):
diff --git a/tests/unittests/test_block.py b/tests/unittests/test_block.py
index 707386f..dcdcf51 100644
--- a/tests/unittests/test_block.py
+++ b/tests/unittests/test_block.py
@@ -522,6 +522,12 @@ class TestPartTableSignature(CiTestCase):
     gpt_content_4k = b'\x00' * 0x800 + b'EFI PART' + b'\x00' * (0x800 - 8)
     null_content = b'\x00' * 0xf00
 
+    def setUp(self):
+        super(TestPartTableSignature, self).setUp()
+        self.add_patch('curtin.util.subp', 'm_subp')
+        self.m_subp.side_effect = iter([
+            util.ProcessExecutionError(stdout="", stderr="", exit_code=1)])
+
     def _test_util_load_file(self, content, device, read_len, offset, decode):
         return (bytes if not decode else str)(content[offset:offset+read_len])
 
@@ -577,15 +583,13 @@ class TestPartTableSignature(CiTestCase):
                 (self.assertTrue if expected else self.assertFalse)(
                     block.check_efi_signature(self.blockdev))
 
-    @mock.patch('curtin.block.util.subp')
-    def test_check_vtoc_signature_finds_vtoc_returns_true(self, mock_subp):
-        mock_subp.return_value = ("vtoc.....ok", "")
+    def test_check_vtoc_signature_finds_vtoc_returns_true(self):
+        self.m_subp.side_effect = iter([("vtoc.....ok", "")])
         self.assertTrue(block.check_vtoc_signature(self.blockdev))
 
-    @mock.patch('curtin.block.util.subp')
-    def test_check_vtoc_signature_returns_false_with_no_sig(self, mock_subp):
-        mock_subp.side_effect = [
-            util.ProcessExecutionError(stdout="", stderr="", exit_code=1)]
+    def test_check_vtoc_signature_returns_false_with_no_sig(self):
+        self.m_subp.side_effect = iter([
+            util.ProcessExecutionError(stdout="", stderr="", exit_code=1)])
         self.assertFalse(block.check_vtoc_signature(self.blockdev))
 
 
diff --git a/tests/unittests/test_block_zfs.py b/tests/unittests/test_block_zfs.py
index 3508d4b..2be7c41 100644
--- a/tests/unittests/test_block_zfs.py
+++ b/tests/unittests/test_block_zfs.py
@@ -425,6 +425,9 @@ class TestAssertZfsSupported(CiTestCase):
     def setUp(self):
         super(TestAssertZfsSupported, self).setUp()
 
+    def tearDown(self):
+        self.allowed_subp = False
+
     @mock.patch('curtin.block.zfs.get_supported_filesystems')
     @mock.patch('curtin.block.zfs.distro')
     @mock.patch('curtin.block.zfs.util')
@@ -481,7 +484,7 @@ class TestAssertZfsSupported(CiTestCase):
                                                                m_popen,
                                                                ):
         """zfs_assert_supported raises RuntimeError modprobe zfs error"""
-
+        self.allowed_subp = True
         m_arch.return_value = 'amd64'
         m_release.return_value = {'codename': 'bionic'}
         m_supfs.return_value = ['ext4']
diff --git a/tests/unittests/test_clear_holders.py b/tests/unittests/test_clear_holders.py
index 767ae3a..5d8e62b 100644
--- a/tests/unittests/test_clear_holders.py
+++ b/tests/unittests/test_clear_holders.py
@@ -701,13 +701,14 @@ class TestClearHolders(CiTestCase):
         mock_gen_holders_tree.return_value = self.example_holders_trees[1][1]
         clear_holders.assert_clear(device)
 
+    @mock.patch('curtin.block.clear_holders.udev')
     @mock.patch('curtin.block.clear_holders.multipath')
     @mock.patch('curtin.block.clear_holders.lvm')
     @mock.patch('curtin.block.clear_holders.zfs')
     @mock.patch('curtin.block.clear_holders.mdadm')
     @mock.patch('curtin.block.clear_holders.util')
     def test_start_clear_holders_deps(self, mock_util, mock_mdadm, mock_zfs,
-                                      mock_lvm, mock_mp):
+                                      mock_lvm, mock_mp, mock_udev):
         mock_zfs.zfs_supported.return_value = True
         clear_holders.start_clear_holders_deps()
         mock_mdadm.mdadm_assemble.assert_called_with(
@@ -715,13 +716,15 @@ class TestClearHolders(CiTestCase):
         mock_util.load_kernel_module.assert_has_calls([
                 mock.call('bcache')])
 
+    @mock.patch('curtin.block.clear_holders.udev')
     @mock.patch('curtin.block.clear_holders.multipath')
     @mock.patch('curtin.block.clear_holders.lvm')
     @mock.patch('curtin.block.clear_holders.zfs')
     @mock.patch('curtin.block.clear_holders.mdadm')
     @mock.patch('curtin.block.clear_holders.util')
     def test_start_clear_holders_deps_nozfs(self, mock_util, mock_mdadm,
-                                            mock_zfs, mock_lvm, mock_mp):
+                                            mock_zfs, mock_lvm, mock_mp,
+                                            mock_udev):
         """test that we skip zfs modprobe on unsupported platforms"""
         mock_zfs.zfs_supported.return_value = False
         clear_holders.start_clear_holders_deps()
diff --git a/tests/unittests/test_commands_collect_logs.py b/tests/unittests/test_commands_collect_logs.py
index 1feba18..b5df902 100644
--- a/tests/unittests/test_commands_collect_logs.py
+++ b/tests/unittests/test_commands_collect_logs.py
@@ -288,6 +288,7 @@ class TestCreateTar(CiTestCase):
 
         Configured log_file or post_files which don't exist are ignored.
         """
+        self.add_patch('curtin.util.subp', 'mock_subp')
         tarfile = self.tmp_path('my.tar', _dir=self.new_root)
         log1 = self.tmp_path('some.log', _dir=self.new_root)
         write_file(log1, 'log content')
@@ -314,6 +315,7 @@ class TestCreateTar(CiTestCase):
 
     def test_create_log_tarfile_redacts_maas_credentials(self):
         """create_log_tarfile redacts sensitive maas credentials configured."""
+        self.add_patch('curtin.util.subp', 'mock_subp')
         tarfile = self.tmp_path('my.tar', _dir=self.new_root)
         self.add_patch(
             'curtin.commands.collect_logs._redact_sensitive_information',
diff --git a/tests/unittests/test_gpg.py b/tests/unittests/test_gpg.py
index 2c83ae3..9514f67 100644
--- a/tests/unittests/test_gpg.py
+++ b/tests/unittests/test_gpg.py
@@ -10,6 +10,9 @@ from .helpers import CiTestCase
 
 class TestCurtinGpg(CiTestCase):
 
+    def tearDown(self):
+        self.allowed_subp = False
+
     @patch('curtin.util.subp')
     def test_export_armour(self, mock_subp):
         key = 'DEADBEEF'
@@ -51,6 +54,7 @@ class TestCurtinGpg(CiTestCase):
     @patch('time.sleep')
     @patch('curtin.util._subp')
     def test_recv_key_retry_raises(self, mock_under_subp, mock_sleep):
+        self.allowed_subp = True
         key = 'DEADBEEF'
         keyserver = 'keyserver.ubuntu.com'
         retries = (1, 2, 5, 10)
@@ -74,6 +78,7 @@ class TestCurtinGpg(CiTestCase):
     @patch('time.sleep')
     @patch('curtin.util._subp')
     def test_recv_key_retry_works(self, mock_under_subp, mock_sleep):
+        self.allowed_subp = True
         key = 'DEADBEEF'
         keyserver = 'keyserver.ubuntu.com'
         nr_calls = 2
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 438f7ee..0377357 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -113,6 +113,7 @@ class TestSubp(CiTestCase):
     utf8_invalid = b'ab\xaadef'
     utf8_valid = b'start \xc3\xa9 end'
     utf8_valid_2 = b'd\xc3\xa9j\xc8\xa7'
+    allowed_subp = True
 
     try:
         decode_type = unicode
@@ -552,7 +553,7 @@ class TestTargetPath(CiTestCase):
                          paths.target_path("/target/", "///my/path/"))
 
 
-class TestRunInChroot(CiTestCase):
+class TestRunInChrootTestSubp(CiTestCase):
     """Test the legacy 'RunInChroot'.
 
     The test works by mocking ChrootableTarget's __enter__ to do nothing.
@@ -560,6 +561,7 @@ class TestRunInChroot(CiTestCase):
       a.) RunInChroot is a subclass of ChrootableTarget
       b.) ChrootableTarget's __exit__ only un-does work that its __enter__
           did.  Meaning for our mocked case, it does nothing."""
+    allowed_subp = True
 
     @mock.patch.object(util.ChrootableTarget, "__enter__", new=lambda a: a)
     def test_run_in_chroot_with_target_slash(self):
@@ -567,6 +569,16 @@ class TestRunInChroot(CiTestCase):
             out, err = i(['echo', 'HI MOM'], capture=True)
         self.assertEqual('HI MOM\n', out)
 
+
+class TestRunInChrootTest(CiTestCase):
+    """Test the legacy 'RunInChroot'.
+
+    The test works by mocking ChrootableTarget's __enter__ to do nothing.
+    The assumptions made are:
+      a.) RunInChroot is a subclass of ChrootableTarget
+      b.) ChrootableTarget's __exit__ only un-does work that its __enter__
+          did.  Meaning for our mocked case, it does nothing."""
+
     @mock.patch.object(util.ChrootableTarget, "__enter__", new=lambda a: a)
     @mock.patch("curtin.util.subp")
     def test_run_in_chroot_with_target(self, m_subp):

Follow ups