cloud-init-dev team mailing list archive
-
cloud-init-dev team
-
Mailing list archive
-
Message #05258
[Merge] ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master
Francis Ginther has proposed merging ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.
Commit message:
Add a configuration option, 'ssh_disable_users', for declaring a list of usernames to disable login via ssh and redirect to the default user.
Also adds unit tests for config/cc_ssh.py to verify both the pre-existing and the new behavior.
Requested reviews:
cloud-init commiters (cloud-init-dev)
Related bugs:
Bug #1771198 in cloud-init: "Support disable_root-esque behaviour for other users"
https://bugs.launchpad.net/cloud-init/+bug/1771198
For more details, see:
https://code.launchpad.net/~fginther/cloud-init/+git/cloud-init/+merge/352053
This re-implements the 'disable_root' option for a list of users, instead of just root.
Testing is provided through unit tests. These did not exist for the config/cc_ssh.py module, so a basic set of tests were created to cover the existing 'disable_root' behavior. These tests were then expanded an modified to match the 'ssh_disable_users' implementation. The 'disable_root: true' option still exists, but it will be converted to 'ssh_disable_users: ["root"]' for processing.
--
Your team cloud-init commiters is requested to review the proposed merge of ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.
diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py
old mode 100755
new mode 100644
index 45204a0..52c819b
--- a/cloudinit/config/cc_ssh.py
+++ b/cloudinit/config/cc_ssh.py
@@ -55,6 +55,11 @@ root login is disabled, and root login opts are set to::
no-port-forwarding,no-agent-forwarding,no-X11-forwarding
+Login for other users can similarly be disabled with the ``ssh_disable_users``
+config list. Users in this list will have the same ``disable_root_opts``
+applied and references to the string ``$ROOT`` will be replace with the user
+being redirected.
+
Authorized keys for the default user/first user defined in ``users`` can be
specified using `ssh_authorized_keys``. Keys should be specified as a list of
public keys.
@@ -87,6 +92,7 @@ public keys.
dsa_public: ssh-dsa AAAAB3NzaC1yc2EAAAABIwAAAGEAoPRhIfLvedSDKw7Xd ...
ssh_genkeytypes: <key type>
disable_root: <true/false>
+ ssh_disable_users: <list of user to block and redirect to the default user>
disable_root_opts: <disable root options string>
ssh_authorized_keys:
- ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEA3FSyQwBI6Z+nCSjUU ...
@@ -104,7 +110,7 @@ from cloudinit import util
DISABLE_ROOT_OPTS = (
"no-port-forwarding,no-agent-forwarding,"
"no-X11-forwarding,command=\"echo \'Please login as the user \\\"$USER\\\""
- " rather than the user \\\"root\\\".\';echo;sleep 10\"")
+ " rather than the user \\\"$ROOT\\\".\';echo;sleep 10\"")
GENERATE_KEY_NAMES = ['rsa', 'dsa', 'ecdsa', 'ed25519']
KEY_FILE_TPL = '/etc/ssh/ssh_host_%s_key'
@@ -183,33 +189,45 @@ def handle(_name, cfg, cloud, log, _args):
try:
(users, _groups) = ug_util.normalize_users_groups(cfg, cloud.distro)
(user, _user_config) = ug_util.extract_default(users)
+ ssh_disable_users = util.get_cfg_option_list(cfg, "ssh_disable_users",
+ [])
disable_root = util.get_cfg_option_bool(cfg, "disable_root", True)
disable_root_opts = util.get_cfg_option_str(cfg, "disable_root_opts",
DISABLE_ROOT_OPTS)
+ ssh_disable_users = list(set(ssh_disable_users).difference(set(users)))
+ for ssh_user in ssh_disable_users:
+ cloud.distro.create_user(ssh_user, **cfg)
+
+ if disable_root:
+ ssh_disable_users.append("root")
+
keys = cloud.get_public_ssh_keys() or []
if "ssh_authorized_keys" in cfg:
cfgkeys = cfg["ssh_authorized_keys"]
keys.extend(cfgkeys)
- apply_credentials(keys, user, disable_root, disable_root_opts)
+ apply_credentials(keys, user, ssh_disable_users, disable_root_opts)
except Exception:
util.logexc(log, "Applying ssh credentials failed!")
-def apply_credentials(keys, user, disable_root, disable_root_opts):
+def apply_credentials(keys, user, ssh_disable_users, disable_root_opts):
keys = set(keys)
+ ssh_disable_users = set(ssh_disable_users)
if user:
ssh_util.setup_user_keys(keys, user)
- if disable_root:
- if not user:
- user = "NONE"
- key_prefix = disable_root_opts.replace('$USER', user)
- else:
- key_prefix = ''
+ if 'root' not in ssh_disable_users:
+ ssh_util.setup_user_keys(keys, 'root', options='')
+
+ if not user:
+ user = "NONE"
+ key_prefix = disable_root_opts.replace('$USER', user)
- ssh_util.setup_user_keys(keys, 'root', options=key_prefix)
+ for disable_user in ssh_disable_users:
+ disable_prefix = key_prefix.replace('$ROOT', disable_user)
+ ssh_util.setup_user_keys(keys, disable_user, options=disable_prefix)
# vi: ts=4 expandtab
diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py
new file mode 100644
index 0000000..3da4330
--- /dev/null
+++ b/cloudinit/config/tests/test_ssh.py
@@ -0,0 +1,277 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import mock
+
+from cloudinit.config import cc_ssh
+from cloudinit.tests.helpers import CiTestCase
+
+MODPATH = "cloudinit.config.cc_ssh."
+
+
+class TestHandleSsh(CiTestCase):
+ """Test cc_ssh handling of ssh config."""
+
+ with_logs = True
+
+ def _assert_has_calls(self, mock, expected_list):
+ """Compare the expected call list with the mock's in any order."""
+ mock_list = mock.call_args_list[:]
+ for expected_call in expected_list:
+ if expected_call in mock_list:
+ mock_list.pop(mock_list.index(expected_call))
+ else:
+ self.assertIn(expected_call, mock_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_user(self, m_suk):
+ """Apply keys for the given user and root."""
+ keys = ["key1"]
+ user = "clouduser"
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, [], options)
+ self.assertEqual([mock.call(set(keys), user),
+ mock.call(set(keys), "root", options="")],
+ m_suk.call_args_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_no_user(self, m_suk):
+ """Apply keys for root only."""
+ keys = ["key1"]
+ user = None
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, [], options)
+ self.assertEqual([mock.call(set(keys), "root", options="")],
+ m_suk.call_args_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_user_disable_root(self, m_suk):
+ """Apply keys for the given user and disable root ssh."""
+ keys = ["key1"]
+ user = "clouduser"
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, ["root"], options)
+ options = options.replace("$USER", user)
+ options = options.replace("$ROOT", "root")
+ self.assertEqual([mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options)],
+ m_suk.call_args_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_no_user_disable_root(self, m_suk):
+ """Apply keys no user and disable root ssh."""
+ keys = ["key1"]
+ user = None
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, ["root"], options)
+ options = options.replace("$USER", "NONE")
+ options = options.replace("$ROOT", "root")
+ self.assertEqual([mock.call(set(keys), "root", options=options)],
+ m_suk.call_args_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_user_and_disable_list(self, m_suk):
+ """Apply keys for the given user and disable a list of users."""
+ keys = ["key1"]
+ user = "clouduser"
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, ["root", "disable_me"], options)
+ options = options.replace("$USER", user)
+ options_root = options.replace("$ROOT", "root")
+ options_disable_me = options.replace("$ROOT", "disable_me")
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options_root),
+ mock.call(set(keys), "disable_me", options=options_disable_me)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ def test_apply_credentials_with_user_and_disable_non_root(self, m_suk):
+ """Apply keys for the given user and disable non-root users."""
+ keys = ["key1"]
+ user = "clouduser"
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ cc_ssh.apply_credentials(keys, user, ["disable_me", "disable_you"],
+ options)
+ options = options.replace("$USER", user)
+ options_disable_me = options.replace("$ROOT", "disable_me")
+ options_disable_you = options.replace("$ROOT", "disable_you")
+ print(dir(mock))
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=""),
+ mock.call(set(keys), "disable_me", options=options_disable_me),
+ mock.call(set(keys), "disable_you", options=options_disable_you)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_no_cfg(self, m_ope, m_nug, m_suk):
+ """Test handle with no config and no distro user."""
+ cfg = {}
+ keys = ["key1"]
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ([], {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ options = options.replace("$USER", "NONE")
+ options = options.replace("$ROOT", "root")
+ self.assertEqual([mock.call(set(keys), "root", options=options)],
+ m_suk.call_args_list)
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_no_cfg_and_default_root(self, m_ope, m_nug, m_suk):
+ """Test handle with no config and a default distro user."""
+ cfg = {}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ options = options.replace("$USER", user)
+ options = options.replace("$ROOT", "root")
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_cfg_with_explicit_disable_root(self, m_ope, m_nug, m_suk):
+ """Test handle with explicit disable_root and a default distro user."""
+ # This test is identical to test_handle_no_cfg_and_default_root,
+ # except this uses an explicit cfg value
+ cfg = {"disable_root": True}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ options = options.replace("$USER", user)
+ options = options.replace("$ROOT", "root")
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_cfg_with_disable_root_false(self, m_ope, m_nug, m_suk):
+ """Test handle with disable_root == False."""
+ # When disable_root == False, the ssh redirect for root is skipped
+ cfg = {"disable_root": False}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options="")])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_cfg_with_ssh_disable_users(self, m_ope, m_nug, m_suk):
+ """Test handle with ssh_disable_users."""
+ cfg = {"ssh_disable_users": ["disable_me"]}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ options = options.replace("$USER", user)
+ options_root = options.replace("$ROOT", "root")
+ options_disable_me = options.replace("$ROOT", "disable_me")
+ # This will include "root" as "disable_root: True" is the default
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=options_root),
+ mock.call(set(keys), "disable_me", options=options_disable_me)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_cfg_with_ssh_disable_users_and_no_root(self, m_ope, m_nug,
+ m_suk):
+ """Test handle with ssh_disable_users and disable_root == False."""
+ cfg = {"disable_root": False,
+ "ssh_disable_users": ["disable_me"]}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ options = cc_ssh.DISABLE_ROOT_OPTS
+ options = options.replace("$USER", user)
+ options = options.replace("$ROOT", "disable_me")
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options=""),
+ mock.call(set(keys), "disable_me", options=options)])
+
+ @mock.patch(MODPATH + "ssh_util.setup_user_keys")
+ @mock.patch(MODPATH + "ug_util.normalize_users_groups")
+ @mock.patch(MODPATH + "os.path.exists")
+ def test_handle_with_matching_ssh_disable_users_and_user(self, m_ope,
+ m_nug, m_suk):
+ """Test handle with a matching user and ssh_disable_users entry."""
+ matching_user = "matching"
+ cfg = {"disable_root": False,
+ "ssh_disable_users": [matching_user]}
+ keys = ["key1"]
+ user = "clouduser"
+ # Mock os.path.exits to True to short-circuit the key writing logic
+ m_ope.return_value = True
+ m_nug.return_value = ({user: {"default": user},
+ matching_user: {"default": False}}, {})
+ cloud = mock.Mock()
+ cloud.distro = mock.Mock()
+ cloud.get_public_ssh_keys = mock.Mock(return_value=keys)
+ cc_ssh.handle("name", cfg, cloud, self.logger, None)
+
+ self.assertEqual(m_suk.call_count, 2)
+ self._assert_has_calls(
+ m_suk,
+ [mock.call(set(keys), user),
+ mock.call(set(keys), "root", options="")])
diff --git a/doc/examples/cloud-config.txt b/doc/examples/cloud-config.txt
index 774f66b..b6970a8 100644
--- a/doc/examples/cloud-config.txt
+++ b/doc/examples/cloud-config.txt
@@ -228,13 +228,25 @@ byobu_by_default: system
# default: true
disable_root: false
+# disable ssh access for a list of users
+# Block ssh login to these users and redirect to the 'ubuntu' or default distro
+# user. This is useful for clouds which want to document a standard user across
+# all environments, which then conflicts with the idea of a default distro
+# user. This provides the hint of which user to use.
+# default: []
+ssh_disable_users:
+ - cloud_default_user
+
# disable_root_opts: the value of this variable will prefix the
# respective key in /root/.ssh/authorized_keys if disable_root is true
+# and /home/$ENTRY/.ssh/authorized_keys for any ENTRY in the
+# ssh_disable_users list
# see 'man authorized_keys' for more information on what you can do here
#
# The string '$USER' will be replaced with the username of the default user
+# and the string '$ROOT' will be replace with the username being redirected.
#
-# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"root\".';echo;sleep 10"
+# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"$ROOT\".';echo;sleep 10"
# set the locale to a given locale
Follow ups