curtin-dev team mailing list archive
-
curtin-dev team
-
Mailing list archive
-
Message #01853
[Merge] ~mwhudson/curtin:extract-refactor into curtin:master
Michael Hudson-Doyle has proposed merging ~mwhudson/curtin:extract-refactor into curtin:master.
Commit message:
curtin.commands.extract: Factor preparing sources for copying into a class
I want to use this in subiquity. I couldn't help refactoring this a bit
while I was there!
Requested reviews:
curtin developers (curtin-dev)
For more details, see:
https://code.launchpad.net/~mwhudson/curtin/+git/curtin/+merge/410267
--
Your team curtin developers is requested to review the proposed merge of ~mwhudson/curtin:extract-refactor into curtin:master.
diff --git a/curtin/commands/extract.py b/curtin/commands/extract.py
index 069023f..b96430f 100644
--- a/curtin/commands/extract.py
+++ b/curtin/commands/extract.py
@@ -57,125 +57,93 @@ def extract_root_tgz_url(url, target):
'--', url, target])
-def extract_root_fsimage_url(url, target):
- path = _path_from_file_url(url)
- if path != url or os.path.isfile(path):
- return _extract_root_fsimage(path, target)
-
- wfp = tempfile.NamedTemporaryFile(suffix=".img", delete=False)
- wfp.close()
- try:
- url_helper.download(url, wfp.name, retries=3)
- return _extract_root_fsimage(wfp.name, target)
- finally:
- os.unlink(wfp.name)
-
-
-def _extract_root_fsimage(path, target):
- mp = tempfile.mkdtemp()
- try:
- util.subp(['mount', '-o', 'loop,ro', path, mp], capture=True)
- except util.ProcessExecutionError as e:
- LOG.error("Failed to mount '%s' for extraction: %s", path, e)
- os.rmdir(mp)
- raise e
- try:
- return copy_to_target(mp, target)
- finally:
- util.subp(['umount', mp])
- os.rmdir(mp)
-
-
-def extract_root_layered_fsimage_url(uri, target):
- ''' Build images list to consider from a layered structure
-
- uri: URI of the layer file
- target: Target file system to provision
+class Mounter:
+
+ def mount(self, device, mountpoint, *, options=None, type=None):
+ opts = []
+ if options is not None:
+ opts.extend(['-o', options])
+ if type is not None:
+ opts.extend(['-t', type])
+ util.subp(['mount'] + opts + [device, mountpoint], capture=True)
+
+ def unmount(self, mountpoint):
+ util.subp(['umount', mountpoint], capture=True)
+
+
+class LayeredImageHandler:
+
+ def __init__(self, image_stack, mounter):
+ self.image_stack = image_stack
+ self._tmpdir = None
+ self._mounts = []
+ self.mounter = mounter
+
+ def tmpdir(self):
+ if self._tmpdir is None:
+ self._tmpdir = tempfile.mkdtemp()
+ return self._tmpdir
+
+ def download(self):
+ new_image_stack = []
+ for path in self.image_stack:
+ if url_helper.urlparse(path).scheme not in ["", "file"]:
+ new_path = os.path.join(self.tmpdir(), os.path.basename(path))
+ url_helper.download(path, new_path, retries=3)
+ else:
+ new_path = _path_from_file_url(path)
+ new_image_stack.append(new_path)
+ self.image_stack = new_image_stack
- return: None
- '''
- path = _path_from_file_url(uri)
-
- image_stack = _get_image_stack(path)
- LOG.debug("Considering fsimages: '%s'", ",".join(image_stack))
-
- tmp_dir = None
- try:
- # Download every remote images if remote url
- if url_helper.urlparse(path).scheme != "":
- tmp_dir = tempfile.mkdtemp()
- image_stack = _download_layered_images(image_stack, tmp_dir)
-
- # Check that all images exists on disk and are not empty
- for img in image_stack:
- if not os.path.isfile(img) or os.path.getsize(img) <= 0:
- raise ValueError("Failed to use fsimage: '%s' doesn't exist " +
- "or is invalid", img)
-
- return _extract_root_layered_fsimage(image_stack, target)
- finally:
- if tmp_dir and os.path.exists(tmp_dir):
- shutil.rmtree(tmp_dir)
-
-
-def _download_layered_images(image_stack, tmp_dir):
- local_image_stack = []
- try:
- for img_url in image_stack:
- dest_path = os.path.join(tmp_dir,
- os.path.basename(img_url))
- url_helper.download(img_url, dest_path, retries=3)
- local_image_stack.append(dest_path)
- except url_helper.UrlError as e:
- LOG.error("Failed to download '%s'" % img_url)
- raise e
- return local_image_stack
-
-
-def _extract_root_layered_fsimage(image_stack, target):
- mp_base = tempfile.mkdtemp()
- mps = []
- try:
- # Create a mount point for each image file and mount the image
+ def setup(self):
try:
- for img in image_stack:
- mp = os.path.join(mp_base, os.path.basename(img) + ".dir")
+ self.download()
+ # Check that all images exists on disk and are not empty
+ for img in self.image_stack:
+ if not os.path.isfile(img) or os.path.getsize(img) <= 0:
+ raise ValueError(
+ ("Failed to use fsimage: '%s' doesn't exist " +
+ "or is invalid") % (img,))
+ for img in self.image_stack:
+ mp = os.path.join(
+ self.tmpdir(), os.path.basename(img) + ".dir")
os.mkdir(mp)
- util.subp(['mount', '-o', 'loop,ro', img, mp], capture=True)
- mps.insert(0, mp)
- except util.ProcessExecutionError as e:
- LOG.error("Failed to mount '%s' for extraction: %s", img, e)
- raise e
-
- # Prepare
- if len(mps) == 1:
- root_dir = mps[0]
- else:
- # Multiple image files, merge them with an overlay and do the copy
- root_dir = os.path.join(mp_base, "root.dir")
- os.mkdir(root_dir)
- try:
- util.subp(['mount', '-t', 'overlay', 'overlay', '-o',
- 'lowerdir=' + ':'.join(mps), root_dir],
- capture=True)
- mps.append(root_dir)
- except util.ProcessExecutionError as e:
- LOG.error("overlay mount to %s failed: %s", root_dir, e)
- raise e
-
- copy_to_target(root_dir, target)
- finally:
- umount_err_mps = []
- for mp in reversed(mps):
- try:
- util.subp(['umount', mp], capture=True)
- except util.ProcessExecutionError as e:
- LOG.error("can't unmount %s: %e", mp, e)
- umount_err_mps.append(mp)
- if umount_err_mps:
- raise util.ProcessExecutionError(
- "Failed to umount: %s", ", ".join(umount_err_mps))
- shutil.rmtree(mp_base)
+ self.mounter.mount(img, mp, options='loop,ro')
+ self._mounts.append(mp)
+ if len(self._mounts) == 1:
+ root_dir = self._mounts[0]
+ else:
+ # Multiple image files, merge them with an overlay.
+ root_dir = os.path.join(self.tmpdir(), "root.dir")
+ os.mkdir(root_dir)
+ self.mounter.mount(
+ 'overlay', root_dir, type='overlay',
+ options='lowerdir=' + ':'.join(reversed(self._mounts)))
+ self._mounts.append(root_dir)
+ return root_dir
+ except Exception:
+ self.cleanup()
+ raise
+
+ def cleanup(self):
+ for mount in reversed(self._mounts):
+ self.mounter.unmount(mount)
+ self._mounts = []
+ if self._tmpdir is not None:
+ shutil.rmtree(self._tmpdir)
+ self._tmpdir = None
+
+
+class TrivialImageHandler:
+
+ def __init__(self, path):
+ self.path = path
+
+ def setup(self):
+ return self.path
+
+ def cleanup(self):
+ pass
def _get_image_stack(uri):
@@ -186,21 +154,44 @@ def _get_image_stack(uri):
'''
image_stack = []
- root_dir = os.path.dirname(uri)
img_name = os.path.basename(uri)
- _, img_ext = os.path.splitext(img_name)
+ root_dir = uri[:-len(img_name)]
+ img_base, img_ext = os.path.splitext(img_name)
+
+ if not img_base:
+ return []
- img_parts = img_name.split('.')
- # Last item is the extension
- for i in img_parts[:-1]:
+ img_parts = img_base.split('.')
+ for i in range(len(img_parts)):
image_stack.append(
- os.path.join(
- root_dir,
- '.'.join(img_parts[0:img_parts.index(i)+1]) + img_ext))
+ root_dir + '.'.join(img_parts[0:i+1]) + img_ext)
return image_stack
+def get_image_handler_for_source(source, mounter):
+ if source['uri'].startswith("cp://"):
+ return TrivialImageHandler(source['uri'])
+ elif source['type'] == "fsimage":
+ return LayeredImageHandler([source['uri']], mounter)
+ elif source['type'] == "fsimage-layered":
+ return LayeredImageHandler(_get_image_stack(source['uri']), mounter)
+ else:
+ return None
+
+
+def extract_source(source, target, mounter=Mounter()):
+ handler = get_image_handler_for_source(source, mounter)
+ if handler is not None:
+ root_dir = handler.setup()
+ try:
+ copy_to_target(root_dir, target)
+ finally:
+ handler.cleanup()
+ else:
+ extract_root_tgz_url(source['uri'], target=target)
+
+
def copy_to_target(source, target):
if source.startswith("cp://"):
source = source[5:]
@@ -245,14 +236,7 @@ def extract(args):
source['uri']):
if source['type'].startswith('dd-'):
continue
- if source['uri'].startswith("cp://"):
- copy_to_target(source['uri'], target)
- elif source['type'] == "fsimage":
- extract_root_fsimage_url(source['uri'], target=target)
- elif source['type'] == "fsimage-layered":
- extract_root_layered_fsimage_url(source['uri'], target=target)
- else:
- extract_root_tgz_url(source['uri'], target=target)
+ extract_source(source)
if cfg.get('write_files'):
LOG.info("Applying write_files from config.")
diff --git a/tests/unittests/test_commands_extract.py b/tests/unittests/test_commands_extract.py
index c318699..51ac28f 100644
--- a/tests/unittests/test_commands_extract.py
+++ b/tests/unittests/test_commands_extract.py
@@ -4,211 +4,343 @@ import os
from .helpers import CiTestCase
from curtin import util
-from curtin.commands.extract import (extract_root_fsimage_url,
- extract_root_layered_fsimage_url,
- _get_image_stack)
+from curtin.commands.extract import (
+ extract_source,
+ _get_image_stack,
+ )
from curtin.url_helper import UrlError
-class TestExtractRootFsImageUrl(CiTestCase):
- """Test extract_root_fsimage_url."""
+class Mount:
+ def __init__(self, device, mountpoint, options, type):
+ self.device = device
+ self.mountpoint = mountpoint
+ self.options = options
+ self.type = type
+ self.unmounted = False
+
+ def __repr__(self):
+ return "Mount({!r}, {!r}, {!r}, {!r})".format(
+ self.device, self.mountpoint, self.options, self.type)
+
+
+class RecordingMounter:
+
+ def __init__(self, testcase):
+ self.testcase = testcase
+ self.mounts = []
+
+ def mount(self, device, mountpoint, *, options=None, type=None):
+ self.testcase.assertTrue(os.path.isdir(mountpoint))
+ self.mounts.append(Mount(device, mountpoint, options, type))
+
+ def unmount(self, mountpoint):
+ for m in reversed(self.mounts):
+ if m.mountpoint == mountpoint and not m.unmounted:
+ m.unmounted = True
+ return
+ else:
+ raise Exception("%s not mounted" % (mountpoint,))
+
+
+class ExtractTestCase(CiTestCase):
+
def _fake_download(self, url, path, retries=0):
self.downloads.append(os.path.abspath(path))
with open(path, "w") as fp:
fp.write("fake content from " + url + "\n")
def setUp(self):
- super(TestExtractRootFsImageUrl, self).setUp()
+ super(ExtractTestCase, self).setUp()
self.downloads = []
self.add_patch("curtin.commands.extract.url_helper.download",
"m_download", side_effect=self._fake_download)
- self.add_patch("curtin.commands.extract._extract_root_fsimage",
- "m__extract_root_fsimage")
+ self.add_patch("curtin.commands.extract.copy_to_target",
+ "m_copy_to_target")
+
+ def tearDown(self):
+ super(ExtractTestCase, self).tearDown()
+ # ensure the files got cleaned up.
+ self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
+
+ def assert_mounted_and_extracted(self, mounter, fnames, target):
+ self.assertIn(len(mounter.mounts), [len(fnames), len(fnames)+1])
+ expected_lowers = []
+ for mount, fname in zip(mounter.mounts, fnames):
+ self.assertTrue(mount.unmounted)
+ self.assertEqual(mount.device, fname)
+ expected_lowers.append(mount.mountpoint)
+ expected_lowers.reverse()
+ tmp_mount = mounter.mounts[-1]
+ if len(mounter.mounts) == len(fnames)+1:
+ opts = tmp_mount.options.split(',')
+ for opt in opts:
+ if opt.startswith('lowerdir='):
+ seen_lowers = opt[len('lowerdir='):].split(":")
+ break
+ else:
+ self.fail("did not find expected lowerdir option")
+ self.assertEqual(expected_lowers, seen_lowers)
+ self.m_copy_to_target.assert_called_once_with(
+ tmp_mount.mountpoint, target)
+
+
+class TestExtractSourceCp(ExtractTestCase):
+ """Test extract_source with cp sources."""
+
+ def test_cp_uri(self):
+ mounter = RecordingMounter(self)
+ path = self.random_string()
+ target = self.random_string()
+
+ extract_source({'uri': 'cp://' + path}, target, mounter)
+
+ self.assertEqual(0, self.m_download.call_count)
+ self.m_copy_to_target.assert_called_once_with('cp://' + path, target)
+
+
+class TestExtractSourceFsImageUrl(ExtractTestCase):
+ """Test extract_source with fsimage sources."""
+
+ def tmp_path_with_random_content(self, name=None):
+ if name is None:
+ name = self.random_string()
+ tdir = self.tmp_dir()
+ path = os.path.join(tdir, self.random_string())
+ util.write_file(path, self.random_string())
+ return path
+
+ def test_abspath(self):
+ mounter = RecordingMounter(self)
+ path = self.tmp_path_with_random_content()
+ target = self.random_string()
+
+ extract_source({'type': 'fsimage', 'uri': path}, target, mounter)
+
+ self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, [path], target)
+
+ def test_abspath_dots(self):
+ mounter = RecordingMounter(self)
+ path = self.tmp_path_with_random_content(name='a.b.c')
+ target = self.random_string()
+
+ extract_source({'type': 'fsimage', 'uri': path}, target, mounter)
+
+ self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, [path], target)
+
+ def test_relpath(self):
+ mounter = RecordingMounter(self)
+ path = self.tmp_path_with_random_content()
+ target = self.random_string()
- def test_relative_file_url(self):
- """extract_root_fsimage_url supports relative file:// urls."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
startdir = os.getcwd()
- fname = "my.img"
try:
- os.chdir(tmpd)
- util.write_file(fname, fname + " data\n")
- extract_root_fsimage_url("file://" + fname, target)
+ os.chdir(os.path.dirname(path))
+ extract_source(
+ {'type': 'fsimage', 'uri': os.path.basename(path)},
+ target, mounter)
finally:
os.chdir(startdir)
- self.assertEqual(1, self.m__extract_root_fsimage.call_count)
+
+ self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(
+ mounter, [os.path.basename(path)], target)
+
+ def test_abs_fileurl(self):
+ mounter = RecordingMounter(self)
+ path = self.tmp_path_with_random_content()
+ target = self.random_string()
+
+ extract_source(
+ {'type': 'fsimage', 'uri': 'file://' + path},
+ target, mounter)
+
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, [path], target)
+
+ def test_rel_fileurl(self):
+ mounter = RecordingMounter(self)
+ path = self.tmp_path_with_random_content()
+ target = self.random_string()
+
+ startdir = os.getcwd()
+ try:
+ os.chdir(os.path.dirname(path))
+ extract_source(
+ {'type': 'fsimage', 'uri': 'file://' + os.path.basename(path)},
+ target, mounter)
+ finally:
+ os.chdir(startdir)
- def test_absolute_file_url(self):
- """extract_root_fsimage_url supports absolute file:/// urls."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- fpath = self.tmp_path("my.img", tmpd)
- util.write_file(fpath, fpath + " data\n")
- extract_root_fsimage_url("file:///" + fpath, target)
- self.assertEqual(1, self.m__extract_root_fsimage.call_count)
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(
+ mounter, [os.path.basename(path)], target)
def test_http_url(self):
"""extract_root_fsimage_url supports http:// urls."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- myurl = "http://bogus.example.com/my.img"
- extract_root_fsimage_url(myurl, target)
- self.assertEqual(1, self.m__extract_root_fsimage.call_count)
+ mounter = RecordingMounter(self)
+ path = self.random_string()
+ target = self.random_string()
+
+ extract_source(
+ {'type': 'fsimage', 'uri': 'http://' + path},
+ target, mounter)
+
self.assertEqual(1, self.m_download.call_count)
- # ensure the file got cleaned up.
- self.assertEqual(1, len(self.downloads))
- self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
+ self.assert_mounted_and_extracted(mounter, self.downloads, target)
- def test_file_path_not_url(self):
- """extract_root_fsimage_url supports normal file path without file:."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- fpath = self.tmp_path("my.img", tmpd)
- util.write_file(fpath, fpath + " data\n")
- extract_root_fsimage_url(os.path.abspath(fpath), target)
- self.assertEqual(1, self.m__extract_root_fsimage.call_count)
- self.assertEqual(0, self.m_download.call_count)
+class TestExtractSourceLayeredFsImageUrl(ExtractTestCase):
+ """Test extract_source with fsimage-layered sources."""
-class TestExtractRootLayeredFsImageUrl(CiTestCase):
- """Test extract_root_layared_fsimage_url."""
- def _fake_download(self, url, path, retries=0):
- self.downloads.append(os.path.abspath(path))
- with open(path, "w") as fp:
- fp.write("fake content from " + url + "\n")
+ def tmp_paths_with_random_content(self, names):
+ tdir = self.tmp_dir()
+ paths = []
+ longest = ''
+ for name in names:
+ path = os.path.join(tdir, name)
+ util.write_file(path, self.random_string())
+ if len(path) > len(longest):
+ longest = path
+ paths.append(path)
+ return paths, longest
- def setUp(self):
- super(TestExtractRootLayeredFsImageUrl, self).setUp()
- self.downloads = []
- self.add_patch("curtin.commands.extract.url_helper.download",
- "m_download", side_effect=self._fake_download)
- self.add_patch("curtin.commands.extract._extract_root_layered_fsimage",
- "m__extract_root_layered_fsimage")
+ def test_absolute_file_path_single(self):
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(['base.ext'])
+ target = self.random_string()
+
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': longest},
+ target, mounter)
+
+ self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, paths, target)
+
+ def test_relative_file_path_single(self):
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(['base.ext'])
+ target = self.random_string()
- def test_relative_local_file_single(self):
- """extract_root_layered_fsimage_url supports relative file:// uris."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
startdir = os.getcwd()
- fname = "my.img"
try:
- os.chdir(tmpd)
- util.write_file(fname, fname + " data\n")
- extract_root_layered_fsimage_url("file://" + fname, target)
+ os.chdir(os.path.dirname(longest))
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': os.path.basename(longest)},
+ target, mounter)
finally:
os.chdir(startdir)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
+
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(
+ mounter, [os.path.basename(path) for path in paths], target)
+
+ def test_absolute_file_url_single(self):
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(['base.ext'])
+ target = self.random_string()
+
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': 'file://' + longest},
+ target, mounter)
- def test_absolute_local_file_single(self):
- """extract_root_layered_fsimage_url supports absolute file:/// uris."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- fpath = self.tmp_path("my.img", tmpd)
- util.write_file(fpath, fpath + " data\n")
- extract_root_layered_fsimage_url("file:///" + fpath, target)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, paths, target)
+
+ def test_relative_file_url_single(self):
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(['base.ext'])
+ target = self.random_string()
+
+ startdir = os.getcwd()
+ try:
+ os.chdir(os.path.dirname(longest))
+ extract_source(
+ {
+ 'type': 'fsimage-layered',
+ 'uri': 'file://' + os.path.basename(longest),
+ },
+ target, mounter)
+ finally:
+ os.chdir(startdir)
- def test_local_file_path_single(self):
- """extract_root_layered_fsimage_url supports normal file path without
- file:"""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- fpath = self.tmp_path("my.img", tmpd)
- util.write_file(fpath, fpath + " data\n")
- extract_root_layered_fsimage_url(os.path.abspath(fpath), target)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(
+ mounter, [os.path.basename(path) for path in paths], target)
def test_local_file_path_multiple(self):
- """extract_root_layered_fsimage_url supports normal hierarchy file
- path"""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- arg = os.path.abspath(self.tmp_path("minimal.standard.debug.squashfs",
- tmpd))
- for f in ["minimal.squashfs",
- "minimal.standard.squashfs",
- "minimal.standard.debug.squashfs"]:
- fpath = self.tmp_path(f, tmpd)
- util.write_file(fpath, fpath + " data\n")
- extract_root_layered_fsimage_url(arg, target)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(
+ ['base.ext', 'base.overlay.ext', 'base.overlay.other.ext'])
+ target = self.random_string()
+
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': longest},
+ target, mounter)
+
self.assertEqual(0, self.m_download.call_count)
+ self.assert_mounted_and_extracted(mounter, paths, target)
def test_local_file_path_multiple_one_missing(self):
- """extract_root_layered_fsimage_url supports normal hierarchy file
- path but intermediate layer missing"""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- arg = os.path.abspath(self.tmp_path("minimal.standard.debug.squashfs",
- tmpd))
- for f in ["minimal.squashfs",
- "minimal.standard.debug.squashfs"]:
- fpath = self.tmp_path(f, tmpd)
- util.write_file(fpath, fpath + " data\n")
- self.assertRaises(ValueError, extract_root_layered_fsimage_url, arg,
- target)
- self.assertEqual(0, self.m__extract_root_layered_fsimage.call_count)
- self.assertEqual(0, self.m_download.call_count)
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(
+ ['base.ext', 'base.overlay.other.ext'])
+ target = self.random_string()
+
+ self.assertRaises(
+ ValueError, extract_source,
+ {'type': 'fsimage-layered', 'uri': longest},
+ target, mounter)
def test_local_file_path_multiple_one_empty(self):
- """extract_root_layered_fsimage_url supports normal hierarchy file
- path but intermediate layer empty"""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- arg = os.path.abspath(self.tmp_path("minimal.standard.debug.squashfs",
- tmpd))
- for f in ["minimal.squashfs",
- "minimal.standard.squashfs"
- "minimal.standard.debug.squashfs"]:
- fpath = self.tmp_path(f, tmpd)
- if f == "minimal.standard.squashfs":
- util.write_file(fpath, "")
- else:
- util.write_file(fpath, fpath + " data\n")
- self.assertRaises(ValueError, extract_root_layered_fsimage_url, arg,
- target)
- self.assertEqual(0, self.m__extract_root_layered_fsimage.call_count)
- self.assertEqual(0, self.m_download.call_count)
+ mounter = RecordingMounter(self)
+ paths, longest = self.tmp_paths_with_random_content(
+ ['base.ext', 'base.overlay.ext', 'base.overlay.other.ext'])
+ target = self.random_string()
+ util.write_file(paths[1], '')
+
+ self.assertRaises(
+ ValueError, extract_source,
+ {'type': 'fsimage-layered', 'uri': longest},
+ target, mounter)
def test_remote_file_single(self):
- """extract_root_layered_fsimage_url supports http:// urls."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
+ mounter = RecordingMounter(self)
+ target = self.random_string()
myurl = "http://example.io/minimal.squashfs"
- extract_root_layered_fsimage_url(myurl, target)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
+
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': myurl},
+ target, mounter)
+
self.assertEqual(1, self.m_download.call_count)
self.assertEqual("http://example.io/minimal.squashfs",
self.m_download.call_args_list[0][0][0])
- # ensure the file got cleaned up.
- self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
+ self.assert_mounted_and_extracted(mounter, self.downloads, target)
def test_remote_file_multiple(self):
- """extract_root_layered_fsimage_url supports normal hierarchy from
- http:// urls."""
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
+ mounter = RecordingMounter(self)
+ target = self.random_string()
myurl = "http://example.io/minimal.standard.debug.squashfs"
- extract_root_layered_fsimage_url(myurl, target)
- self.assertEqual(1, self.m__extract_root_layered_fsimage.call_count)
+
+ extract_source(
+ {'type': 'fsimage-layered', 'uri': myurl},
+ target, mounter)
+
self.assertEqual(3, self.m_download.call_count)
for i, image_url in enumerate(["minimal.squashfs",
"minimal.standard.squashfs",
"minimal.standard.debug.squashfs"]):
self.assertEqual("http://example.io/" + image_url,
self.m_download.call_args_list[i][0][0])
- # ensure the file got cleaned up.
- self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
+ self.assert_mounted_and_extracted(mounter, self.downloads, target)
def test_remote_file_multiple_one_missing(self):
- """extract_root_layered_fsimage_url supports normal hierarchy from
- http:// urls with one layer missing."""
+ mounter = RecordingMounter(self)
+ target = self.random_string()
+ myurl = "http://example.io/minimal.standard.debug.squashfs"
def fail_download_minimal_standard(url, path, retries=0):
if url == "http://example.io/minimal.standard.squashfs":
@@ -217,23 +349,21 @@ class TestExtractRootLayeredFsImageUrl(CiTestCase):
return self._fake_download(url, path, retries)
self.m_download.side_effect = fail_download_minimal_standard
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- myurl = "http://example.io/minimal.standard.debug.squashfs"
- self.assertRaises(UrlError, extract_root_layered_fsimage_url,
- myurl, target)
- self.assertEqual(0, self.m__extract_root_layered_fsimage.call_count)
+ self.assertRaises(
+ UrlError, extract_source,
+ {'type': 'fsimage-layered', 'uri': myurl},
+ target, mounter)
+ self.assertEqual(0, self.m_copy_to_target.call_count)
self.assertEqual(2, self.m_download.call_count)
for i, image_url in enumerate(["minimal.squashfs",
"minimal.standard.squashfs"]):
self.assertEqual("http://example.io/" + image_url,
self.m_download.call_args_list[i][0][0])
- # ensure the file got cleaned up.
- self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
def test_remote_file_multiple_one_empty(self):
- """extract_root_layered_fsimage_url supports normal hierarchy from
- http:// urls with one layer empty."""
+ mounter = RecordingMounter(self)
+ target = self.random_string()
+ myurl = "http://example.io/minimal.standard.debug.squashfs"
def empty_download_minimal_standard(url, path, retries=0):
if url == "http://example.io/minimal.standard.squashfs":
@@ -244,20 +374,17 @@ class TestExtractRootLayeredFsImageUrl(CiTestCase):
return self._fake_download(url, path, retries)
self.m_download.side_effect = empty_download_minimal_standard
- tmpd = self.tmp_dir()
- target = self.tmp_path("target_d", tmpd)
- myurl = "http://example.io/minimal.standard.debug.squashfs"
- self.assertRaises(ValueError, extract_root_layered_fsimage_url,
- myurl, target)
- self.assertEqual(0, self.m__extract_root_layered_fsimage.call_count)
+ self.assertRaises(
+ ValueError, extract_source,
+ {'type': 'fsimage-layered', 'uri': myurl},
+ target, mounter)
+ self.assertEqual(0, self.m_copy_to_target.call_count)
self.assertEqual(3, self.m_download.call_count)
for i, image_url in enumerate(["minimal.squashfs",
"minimal.standard.squashfs",
"minimal.standard.debug.squashfs"]):
self.assertEqual("http://example.io/" + image_url,
self.m_download.call_args_list[i][0][0])
- # ensure the file got cleaned up.
- self.assertEqual([], [f for f in self.downloads if os.path.exists(f)])
class TestGetImageStack(CiTestCase):
@@ -294,4 +421,11 @@ class TestGetImageStack(CiTestCase):
'https://path.com/to/aa.bbb.cccc.fs'],
_get_image_stack("https://path.com/to/aa.bbb.cccc.fs"))
+ def test_get_image_stack_relative_file_urls(self):
+ self.assertEqual(
+ ['file://aa.fs',
+ 'file://aa.bbb.fs',
+ 'file://aa.bbb.cccc.fs'],
+ _get_image_stack("file://aa.bbb.cccc.fs"))
+
# vi: ts=4 expandtab syntax=python
Follow ups
-
[Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-11-01
-
[Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Michael Hudson-Doyle, 2021-11-01
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-28
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-28
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-27
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-26
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-20
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Michael Hudson-Doyle, 2021-10-20
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Dan Bungert, 2021-10-19
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-18
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Server Team CI bot, 2021-10-15
-
Re: [Merge] ~mwhudson/curtin:extract-refactor into curtin:master
From: Michael Hudson-Doyle, 2021-10-15