← Back to team overview

cloud-init-dev team mailing list archive

[Merge] ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master

 

Francis Ginther has proposed merging ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.

Commit message:
Add a configuration option, 'ssh_disable_users', for declaring a list of usernames to disable login via ssh and redirect to the default user.

Also adds unit tests for config/cc_ssh.py to verify both the pre-existing and the new behavior.

Requested reviews:
  cloud-init commiters (cloud-init-dev)
Related bugs:
  Bug #1771198 in cloud-init: "Support disable_root-esque behaviour for other users"
  https://bugs.launchpad.net/cloud-init/+bug/1771198

For more details, see:
https://code.launchpad.net/~fginther/cloud-init/+git/cloud-init/+merge/352053

This re-implements the 'disable_root' option for a list of users, instead of just root.

Testing is provided through unit tests. These did not exist for the config/cc_ssh.py module, so a basic set of tests were created to cover the existing 'disable_root' behavior. These tests were then expanded an modified to match the 'ssh_disable_users' implementation. The 'disable_root: true' option still exists, but it will be converted to 'ssh_disable_users: ["root"]' for processing.
-- 
Your team cloud-init commiters is requested to review the proposed merge of ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.
diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py
old mode 100755
new mode 100644
index 45204a0..52c819b
--- a/cloudinit/config/cc_ssh.py
+++ b/cloudinit/config/cc_ssh.py
@@ -55,6 +55,11 @@ root login is disabled, and root login opts are set to::
 
     no-port-forwarding,no-agent-forwarding,no-X11-forwarding
 
+Login for other users can similarly be disabled with the ``ssh_disable_users``
+config list. Users in this list will have the same ``disable_root_opts``
+applied and references to the string ``$ROOT`` will be replace with the user
+being redirected.
+
 Authorized keys for the default user/first user defined in ``users`` can be
 specified using `ssh_authorized_keys``. Keys should be specified as a list of
 public keys.
@@ -87,6 +92,7 @@ public keys.
         dsa_public: ssh-dsa AAAAB3NzaC1yc2EAAAABIwAAAGEAoPRhIfLvedSDKw7Xd ...
     ssh_genkeytypes: <key type>
     disable_root: <true/false>
+    ssh_disable_users: <list of user to block and redirect to the default user>
     disable_root_opts: <disable root options string>
     ssh_authorized_keys:
         - ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEA3FSyQwBI6Z+nCSjUU ...
@@ -104,7 +110,7 @@ from cloudinit import util
 DISABLE_ROOT_OPTS = (
     "no-port-forwarding,no-agent-forwarding,"
     "no-X11-forwarding,command=\"echo \'Please login as the user \\\"$USER\\\""
-    " rather than the user \\\"root\\\".\';echo;sleep 10\"")
+    " rather than the user \\\"$ROOT\\\".\';echo;sleep 10\"")
 
 GENERATE_KEY_NAMES = ['rsa', 'dsa', 'ecdsa', 'ed25519']
 KEY_FILE_TPL = '/etc/ssh/ssh_host_%s_key'
@@ -183,33 +189,45 @@ def handle(_name, cfg, cloud, log, _args):
     try:
         (users, _groups) = ug_util.normalize_users_groups(cfg, cloud.distro)
         (user, _user_config) = ug_util.extract_default(users)
+        ssh_disable_users = util.get_cfg_option_list(cfg, "ssh_disable_users",
+                                                     [])
         disable_root = util.get_cfg_option_bool(cfg, "disable_root", True)
         disable_root_opts = util.get_cfg_option_str(cfg, "disable_root_opts",
                                                     DISABLE_ROOT_OPTS)
 
+        ssh_disable_users = list(set(ssh_disable_users).difference(set(users)))
+        for ssh_user in ssh_disable_users:
+            cloud.distro.create_user(ssh_user, **cfg)
+
+        if disable_root:
+            ssh_disable_users.append("root")
+
         keys = cloud.get_public_ssh_keys() or []
         if "ssh_authorized_keys" in cfg:
             cfgkeys = cfg["ssh_authorized_keys"]
             keys.extend(cfgkeys)
 
-        apply_credentials(keys, user, disable_root, disable_root_opts)
+        apply_credentials(keys, user, ssh_disable_users, disable_root_opts)
     except Exception:
         util.logexc(log, "Applying ssh credentials failed!")
 
 
-def apply_credentials(keys, user, disable_root, disable_root_opts):
+def apply_credentials(keys, user, ssh_disable_users, disable_root_opts):
 
     keys = set(keys)
+    ssh_disable_users = set(ssh_disable_users)
     if user:
         ssh_util.setup_user_keys(keys, user)
 
-    if disable_root:
-        if not user:
-            user = "NONE"
-        key_prefix = disable_root_opts.replace('$USER', user)
-    else:
-        key_prefix = ''
+    if 'root' not in ssh_disable_users:
+        ssh_util.setup_user_keys(keys, 'root', options='')
+
+    if not user:
+        user = "NONE"
+    key_prefix = disable_root_opts.replace('$USER', user)
 
-    ssh_util.setup_user_keys(keys, 'root', options=key_prefix)
+    for disable_user in ssh_disable_users:
+        disable_prefix = key_prefix.replace('$ROOT', disable_user)
+        ssh_util.setup_user_keys(keys, disable_user, options=disable_prefix)
 
 # vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
new file mode 100644
index 0000000..3da4330
--- /dev/null
+++ b/cloudinit/config/tests/test_ssh.py
@@ -0,0 +1,277 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import mock
+
+from cloudinit.config import cc_ssh
+from cloudinit.tests.helpers import CiTestCase
+
+MODPATH = "cloudinit.config.cc_ssh."
+
+
+class TestHandleSsh(CiTestCase):
+    """Test cc_ssh handling of ssh config."""
+
+    with_logs = True
+
+    def _assert_has_calls(self, mock, expected_list):
+        """Compare the expected call list with the mock's in any order."""
+        mock_list = mock.call_args_list[:]
+        for expected_call in expected_list:
+            if expected_call in mock_list:
+                mock_list.pop(mock_list.index(expected_call))
+            else:
+                self.assertIn(expected_call, mock_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_user(self, m_suk):
+        """Apply keys for the given user and root."""
+        keys = ["key1"]
+        user = "clouduser"
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, [], options)
+        self.assertEqual([mock.call(set(keys), user),
+                          mock.call(set(keys), "root", options="")],
+                         m_suk.call_args_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_no_user(self, m_suk):
+        """Apply keys for root only."""
+        keys = ["key1"]
+        user = None
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, [], options)
+        self.assertEqual([mock.call(set(keys), "root", options="")],
+                         m_suk.call_args_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_user_disable_root(self, m_suk):
+        """Apply keys for the given user and disable root ssh."""
+        keys = ["key1"]
+        user = "clouduser"
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, ["root"], options)
+        options = options.replace("$USER", user)
+        options = options.replace("$ROOT", "root")
+        self.assertEqual([mock.call(set(keys), user),
+                          mock.call(set(keys), "root", options=options)],
+                         m_suk.call_args_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_no_user_disable_root(self, m_suk):
+        """Apply keys no user and disable root ssh."""
+        keys = ["key1"]
+        user = None
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, ["root"], options)
+        options = options.replace("$USER", "NONE")
+        options = options.replace("$ROOT", "root")
+        self.assertEqual([mock.call(set(keys), "root", options=options)],
+                         m_suk.call_args_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_user_and_disable_list(self, m_suk):
+        """Apply keys for the given user and disable a list of users."""
+        keys = ["key1"]
+        user = "clouduser"
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, ["root", "disable_me"], options)
+        options = options.replace("$USER", user)
+        options_root = options.replace("$ROOT", "root")
+        options_disable_me = options.replace("$ROOT", "disable_me")
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=options_root),
+             mock.call(set(keys), "disable_me", options=options_disable_me)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    def test_apply_credentials_with_user_and_disable_non_root(self, m_suk):
+        """Apply keys for the given user and disable non-root users."""
+        keys = ["key1"]
+        user = "clouduser"
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        cc_ssh.apply_credentials(keys, user, ["disable_me", "disable_you"],
+                                 options)
+        options = options.replace("$USER", user)
+        options_disable_me = options.replace("$ROOT", "disable_me")
+        options_disable_you = options.replace("$ROOT", "disable_you")
+        print(dir(mock))
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=""),
+             mock.call(set(keys), "disable_me", options=options_disable_me),
+             mock.call(set(keys), "disable_you", options=options_disable_you)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_no_cfg(self, m_ope, m_nug, m_suk):
+        """Test handle with no config and no distro user."""
+        cfg = {}
+        keys = ["key1"]
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ([], {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        options = options.replace("$USER", "NONE")
+        options = options.replace("$ROOT", "root")
+        self.assertEqual([mock.call(set(keys), "root", options=options)],
+                         m_suk.call_args_list)
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_no_cfg_and_default_root(self, m_ope, m_nug, m_suk):
+        """Test handle with no config and a default distro user."""
+        cfg = {}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        options = options.replace("$USER", user)
+        options = options.replace("$ROOT", "root")
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=options)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_cfg_with_explicit_disable_root(self, m_ope, m_nug, m_suk):
+        """Test handle with explicit disable_root and a default distro user."""
+        # This test is identical to test_handle_no_cfg_and_default_root,
+        # except this uses an explicit cfg value
+        cfg = {"disable_root": True}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        options = options.replace("$USER", user)
+        options = options.replace("$ROOT", "root")
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=options)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_cfg_with_disable_root_false(self, m_ope, m_nug, m_suk):
+        """Test handle with disable_root == False."""
+        # When disable_root == False, the ssh redirect for root is skipped
+        cfg = {"disable_root": False}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options="")])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_cfg_with_ssh_disable_users(self, m_ope, m_nug, m_suk):
+        """Test handle with ssh_disable_users."""
+        cfg = {"ssh_disable_users": ["disable_me"]}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        options = options.replace("$USER", user)
+        options_root = options.replace("$ROOT", "root")
+        options_disable_me = options.replace("$ROOT", "disable_me")
+        # This will include "root" as "disable_root: True" is the default
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=options_root),
+             mock.call(set(keys), "disable_me", options=options_disable_me)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_cfg_with_ssh_disable_users_and_no_root(self, m_ope, m_nug,
+                                                           m_suk):
+        """Test handle with ssh_disable_users and disable_root == False."""
+        cfg = {"disable_root": False,
+               "ssh_disable_users": ["disable_me"]}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        options = cc_ssh.DISABLE_ROOT_OPTS
+        options = options.replace("$USER", user)
+        options = options.replace("$ROOT", "disable_me")
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options=""),
+             mock.call(set(keys), "disable_me", options=options)])
+
+    @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+    @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+    @mock.patch(MODPATH + "os.path.exists")
+    def test_handle_with_matching_ssh_disable_users_and_user(self, m_ope,
+                                                             m_nug, m_suk):
+        """Test handle with a matching user and ssh_disable_users entry."""
+        matching_user = "matching"
+        cfg = {"disable_root": False,
+               "ssh_disable_users": [matching_user]}
+        keys = ["key1"]
+        user = "clouduser"
+        # Mock os.path.exits to True to short-circuit the key writing logic
+        m_ope.return_value = True
+        m_nug.return_value = ({user: {"default": user},
+                               matching_user: {"default": False}}, {})
+        cloud = mock.Mock()
+        cloud.distro = mock.Mock()
+        cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+        cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+        self.assertEqual(m_suk.call_count, 2)
+        self._assert_has_calls(
+            m_suk,
+            [mock.call(set(keys), user),
+             mock.call(set(keys), "root", options="")])
diff --git a/doc/examples/cloud-config.txt b/doc/examples/cloud-config.txt
index 774f66b..b6970a8 100644
--- a/doc/examples/cloud-config.txt
+++ b/doc/examples/cloud-config.txt
@@ -228,13 +228,25 @@ byobu_by_default: system
 # default: true
 disable_root: false
 
+# disable ssh access for a list of users
+# Block ssh login to these users and redirect to the 'ubuntu' or default distro
+# user. This is useful for clouds which want to document a standard user across
+# all environments, which then conflicts with the idea of a default distro
+# user. This provides the hint of which user to use.
+# default: []
+ssh_disable_users:
+ - cloud_default_user
+
 # disable_root_opts: the value of this variable will prefix the
 # respective key in /root/.ssh/authorized_keys if disable_root is true
+# and /home/$ENTRY/.ssh/authorized_keys for any ENTRY in the
+# ssh_disable_users list
 # see 'man authorized_keys' for more information on what you can do here
 #
 # The string '$USER' will be replaced with the username of the default user
+# and the string '$ROOT' will be replace with the username being redirected.
 #
-# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"root\".';echo;sleep 10"
+# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"$ROOT\".';echo;sleep 10"
 
 
 # set the locale to a given locale

Follow ups