curtin-dev team mailing list archive
-
curtin-dev team
-
Mailing list archive
-
Message #03743
[Merge] ~bryanfraschetti/curtin:custom-apt-keys into curtin:master
Bryan Fraschetti has proposed merging ~bryanfraschetti/curtin:custom-apt-keys into curtin:master.
Commit message:
feat: Custom keys for apt archives
Users with local Ubuntu archive mirrors and Landscape instances have been unable to specify the corresponding gpg keys with primary and security mirrors. This resulted in errors such as "NO_PUBKEY" on commands such as "apt update" This issue appears to be present in both curtin and cloud-init
This commit adds the functionality to read a user specified "key" field in the primary and security sections of the apt declaration in a cloud-init-config.yaml. The provided key can be raw or path based, and is formatted appropriately. The function that provides this feature, get_mirror_key was modelled after the existing function, get_mirror, which obtains user specified archive URIs. A deb822 template was added to reflect the new primary and security key parameters by adding variables for "Signed-By." Though it is not clear to me from where/how the template is pulled in a real world environment. That is to say some changes will need to be made to use the new template outside of testing
If no primary key is declared, it defaults to the distro gpg key in /usr/share/keyrings. If no security mirror is declared, the corresponding key falls back on the primary key. This is to match the existing behaviour where the security mirror falls back on the primary mirror URI. Therefore, when no keys are specified, the behaviour becomes indistinguishable from the current cloud-init implementation
Keys can also be specified using a keyid and optionally a keyserver. When this is the case, the key is obtained and artificially added to the parsed mirror config and treated as described above.
The commit has new unit tests in test_apt_configure_sources_list_v3.py
Related cloud-init PR: https://github.com/canonical/cloud-init/pull/5828
Requested reviews:
curtin developers (curtin-dev)
Related bugs:
Bug #2059414 in curtin: "Syntax error on curtin/block/__init__.py#L317-L318"
https://bugs.launchpad.net/curtin/+bug/2059414
For more details, see:
https://code.launchpad.net/~bryanfraschetti/curtin/+git/curtin/+merge/476864
--
Your team curtin developers is requested to review the proposed merge of ~bryanfraschetti/curtin:custom-apt-keys into curtin:master.
diff --git a/curtin/block/__init__.py b/curtin/block/__init__.py
index 0de202a..b927695 100644
--- a/curtin/block/__init__.py
+++ b/curtin/block/__init__.py
@@ -315,7 +315,7 @@ def get_partition_sfdisk_info(devpath, sfdisk_info=None):
if os.path.realpath(part['node']) == os.path.realpath(devpath)]
if len(entry) != 1:
raise RuntimeError('Device %s not present in sfdisk dump:\n%s' %
- devpath, util.json_dumps(sfdisk_info))
+ (devpath, util.json_dumps(sfdisk_info)))
return entry.pop()
diff --git a/curtin/commands/apt_config.py b/curtin/commands/apt_config.py
index d38cc4c..bc5cfb2 100644
--- a/curtin/commands/apt_config.py
+++ b/curtin/commands/apt_config.py
@@ -55,6 +55,19 @@ APT_SOURCES_PROPOSED_DEB822 = (
'Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg\n'
)
+APT_SOURCES_TEMPLATE_DEB822 = (
+ 'Types: deb\n'
+ 'URIs: $PRIMARY\n'
+ 'Suites: $RELEASE $RELEASE-updates $RELEASE-backports\n'
+ 'Components: main restricted universe multiverse\n'
+ 'Signed-By: $PRIMARY_KEY\n\n'
+ 'Types: deb\n'
+ 'URIs: $SECURITY\n'
+ 'Suites: $RELEASE-security\n'
+ 'Components: main restricted universe multiverse\n'
+ 'Signed-By: $SECURITY_KEY\n'
+)
+
def get_default_mirrors(arch=None):
"""returns the default mirrors for the target. These depend on the
@@ -638,6 +651,14 @@ def _generate_sources_deb822(cfg, release, mirrors, target=None, arch=None):
entry['Suites'] = list(set(entry['Suites']) - suites_to_disable)
entry['Components'] = list(set(entry['Components']) - comps_to_disable)
+ # Get key type (primary or security) and ignore preceding dollar sign,
+ # which specifies the template variable
+ key_type = entry['Signed-By'][1:]
+ if key_type == "PRIMARY_KEY" or key_type == "SECURITY_KEY":
+ default_key = "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
+ key = mirrors.get(key_type, default_key)
+ entry['Signed-By'] = [util.render_string(key, {key_type: key})]
+
if not entry['Suites']:
# It is invalid for a stanza to have zero suite configured. In
# practise, it can happen when -security is disabled.
@@ -827,6 +848,31 @@ def search_for_mirror(candidates):
return None
+def update_mirror_keys(pmirror_key, smirror_key):
+ """Sets keys for primary and security mirrors and
+ returns default if no keys are defined
+ """
+
+ # Fallback key if none provided
+ default = "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
+
+ if pmirror_key and smirror_key:
+ # Config defined primary and security sources
+ return {"PRIMARY_KEY": pmirror_key, "SECURITY_KEY": smirror_key}
+ elif pmirror_key and not smirror_key:
+ # Primary source was defined, but security was not
+ # Use primary source (and therefore key) for security repo
+ LOG.info("Using primary key for security mirror")
+ return {"PRIMARY_KEY": pmirror_key, "SECURITY_KEY": pmirror_key}
+
+ LOG.info("Setting default key for primary and security mirrors")
+ return {
+ # Primary key not defined, use default archives
+ "PRIMARY_KEY": default,
+ "SECURITY_KEY": default,
+ }
+
+
def update_mirror_info(pmirror, smirror, arch):
"""sets security mirror to primary if not defined.
returns defaults if no mirrors are defined"""
@@ -857,6 +903,62 @@ def get_arch_mirrorconfig(cfg, mirrortype, arch):
return default
+def format_security_key(key):
+ """Ubuntu Deb822 template Signed-By expects the raw key to be
+ indented by two spaces on all lines except the first line, which
+ is agnostic to prefaced whitespace
+ This function formats raw keys accordingly. It also works for
+ formatting path based keys and keyservers since they pass through
+ unaffected
+ """
+
+ lines = key.splitlines()
+ first_line = lines[0]
+ indented_lines = [" " + line for line in lines[1:]]
+ formatted_key = "\n".join([first_line] + indented_lines)
+
+ return formatted_key
+
+
+def get_mirror_key(cfg, mirrortype, arch):
+ """Obtain custom specified gpg key for apt repository
+ in config. Particularly useful for local/on-premise/landscape
+ servers which may be signed by custom keys
+ """
+
+ LOG.debug("Checking %s mirror for provided key", mirrortype)
+
+ mcfg = get_arch_mirrorconfig(cfg, mirrortype, arch)
+ if mcfg is None:
+ LOG.debug("No %s configuration provided", mirrortype)
+ return None
+
+ LOG.debug("Configuration for %s mirror provided", mirrortype)
+
+ if "keyid" in mcfg and "key" not in mcfg:
+ LOG.debug("Key provided in the form of keyid")
+ keyserver = DEFAULT_KEYSERVER
+ if "keyserver" in mcfg:
+ keyserver = mcfg["keyserver"]
+ # Retrieve key from remote and store in mirror config
+ mcfg["key"] = gpg.getkeybyid(mcfg["keyid"], keyserver)
+
+ # Fallback key if none provided
+ default = "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
+
+ mirror_key = mcfg.get("key", None)
+ if mirror_key:
+ # Source and key are explicitly defined
+ LOG.info("Setting key for %s mirror", mirrortype)
+ mirror_key = format_security_key(mirror_key)
+ else:
+ # Source specified, but key not specified
+ LOG.info("Setting default key for %s mirror", mirrortype)
+ mirror_key = default
+
+ return mirror_key
+
+
def get_mirror(cfg, mirrortype, arch):
"""pass the three potential stages of mirror specification
returns None is neither of them found anything otherwise the first
@@ -887,17 +989,24 @@ def find_apt_mirror_info(cfg, arch=None):
if arch is None:
arch = distro.get_architecture()
LOG.debug("got arch for mirror selection: %s", arch)
+
pmirror = get_mirror(cfg, "primary", arch)
LOG.debug("got primary mirror: %s", pmirror)
+ pmirror_key = get_mirror_key(cfg, "primary", arch)
+
smirror = get_mirror(cfg, "security", arch)
LOG.debug("got security mirror: %s", smirror)
+ smirror_key = get_mirror_key(cfg, "security", arch)
# Note: curtin has no cloud-datasource fallback
mirror_info = update_mirror_info(pmirror, smirror, arch)
+ mirror_keys = update_mirror_keys(pmirror_key, smirror_key)
# less complex replacements use only MIRROR, derive from primary
mirror_info["MIRROR"] = mirror_info["PRIMARY"]
+ mirror_info["PRIMARY_KEY"] = mirror_keys["PRIMARY_KEY"]
+ mirror_info["SECURITY_KEY"] = mirror_keys["SECURITY_KEY"]
return mirror_info
diff --git a/tests/unittests/test_apt_custom_sources_list.py b/tests/unittests/test_apt_custom_sources_list.py
index 9737781..b0aec82 100644
--- a/tests/unittests/test_apt_custom_sources_list.py
+++ b/tests/unittests/test_apt_custom_sources_list.py
@@ -135,6 +135,47 @@ deb http://test.ubuntu.com/ubuntu/ notouched-updates main restricted
deb http://testsec.ubuntu.com/ubuntu/ notouched-security main restricted
""")
+EXPECTED_PM_CONTENT_BASE = """\
+Types: deb
+URIs: http://local.ubuntu.com/ubuntu/
+Suites: fakerel fakerel-updates fakerel-backports
+Components: main restricted universe multiverse
+"""
+
+EXPECTED_SM_CONTENT_BASE = """\
+Types: deb
+URIs: http://local.ubuntu.com/ubuntu/
+Suites: fakerel-security
+Components: main restricted universe multiverse
+"""
+
+
+class CaseGenerators:
+ @staticmethod
+ def custom_key_test_case_generator():
+ """Helper function to generate cases of cloud distro,
+ and possible arrangements of provided prim+sec keys
+ """
+ # No need to test pm || sm = None since sources will
+ # default to standard ubuntu/debian archive
+ pm = "http://local.ubuntu.com/ubuntu/"
+ pmkeys = ["fakekey 4321", None]
+ sm = "http://local.ubuntu.com/ubuntu/"
+ smkeys = ["fakekey 1234", None]
+
+ case_list = [
+ {
+ "pm": pm,
+ "pmkey": pmkey,
+ "sm": sm,
+ "smkey": smkey
+ }
+ for pmkey in pmkeys
+ for smkey in smkeys
+ ]
+
+ return case_list
+
def mock_want_deb822(return_value):
def inner(test_func):
@@ -273,7 +314,6 @@ class TestAptSourceConfigSourceList(CiTestCase):
# Avoid useless test. Make sure the strings don't start out equal.
self.assertNotEqual(expected, orig_content)
-
util.write_file(easl, orig_content)
apt_config.handle_apt(cfg, target)
self.assertEqual(expected, util.load_file(easl))
@@ -310,6 +350,71 @@ class TestAptSourceConfigSourceList(CiTestCase):
)
)
+ @mock_want_deb822(True)
+ def test_apt_source_list_psm_deb822_custom_keys(self):
+ """test_apt_v3_source_list_psm_custom_keys - Test specifying
+ primary and security mirrors and keys
+ """
+
+ cases = CaseGenerators.custom_key_test_case_generator()
+ for case in cases:
+ pm = case['pm']
+ sm = case['sm']
+ pmkey = case['pmkey']
+ smkey = case['smkey']
+
+ cfg = {
+ "preserve_sources_list": False,
+ "primary": [
+ {
+ "arches": ["default"],
+ "uri": pm,
+ "key": pmkey,
+ }
+ ],
+ "security": [
+ {
+ "arches": ["default"],
+ "uri": sm,
+ "key": smkey
+ }
+ ],
+ }
+
+ target = self.new_root
+ src_list_path = 'etc/apt/sources.list.d/ubuntu.sources'
+ easl = paths.target_path(target, src_list_path)
+ util.write_file(easl, apt_config.APT_SOURCES_TEMPLATE_DEB822)
+
+ arch = distro.get_architecture()
+ with mock.patch.object(
+ distro,
+ 'get_architecture',
+ return_value=arch
+ ):
+ with mock.patch.object(
+ distro, 'lsb_release',
+ return_value={'codename': 'fakerel'}
+ ):
+ apt_config.handle_apt(cfg, target)
+
+ def_key = "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
+
+ exp_pmkey = pmkey if pmkey else def_key
+ pm_signature = f"Signed-By: {exp_pmkey}"
+ expected_pm = f"{EXPECTED_PM_CONTENT_BASE}{pm_signature}"
+
+ if not smkey:
+ smkey = def_key
+ exp_smkey = pmkey if not sm else smkey
+ sm_signature = f"Signed-By: {exp_smkey}"
+ expected_sm = f"{EXPECTED_SM_CONTENT_BASE}{sm_signature}"
+
+ expected = f"{expected_pm}\n\n{expected_sm}\n"
+ observed = util.load_file(easl)
+
+ self.assertEqual(expected, observed)
+
class TestApplyPreserveSourcesList(CiTestCase):
"""Test apply_preserve_sources_list."""
diff --git a/tests/unittests/test_block.py b/tests/unittests/test_block.py
index 2818fe4..9fcdb43 100644
--- a/tests/unittests/test_block.py
+++ b/tests/unittests/test_block.py
@@ -192,6 +192,11 @@ class TestBlock(CiTestCase):
m_exists.return_value = False
self.assertEqual(None, block.disk_to_byid_path('/dev/sdb'))
+ def test_get_partition_sfdisk_info__not_present(self):
+ with self.assertRaisesRegex(RuntimeError,
+ "not present in sfdisk dump"):
+ block.get_partition_sfdisk_info("/dev/sda1", {"partitions": []})
+
class TestSysBlockPath(CiTestCase):
@mock.patch("os.path.exists")
Follow ups