curtin-dev team mailing list archive
-
curtin-dev team
-
Mailing list archive
-
Message #03044
[Merge] ~mwhudson/curtin:action-objects-more into curtin:master
Michael Hudson-Doyle has proposed merging ~mwhudson/curtin:action-objects-more into curtin:master.
Commit message:
block_meta: continue to turn dictionaries into objects
dasd, disk and partition handlers.
Requested reviews:
curtin developers (curtin-dev)
For more details, see:
https://code.launchpad.net/~mwhudson/curtin/+git/curtin/+merge/448442
--
Your team curtin developers is requested to review the proposed merge of ~mwhudson/curtin:action-objects-more into curtin:master.
diff --git a/curtin/block/mkfs.py b/curtin/block/mkfs.py
index 20ee7b8..4e716e7 100644
--- a/curtin/block/mkfs.py
+++ b/curtin/block/mkfs.py
@@ -230,15 +230,12 @@ def mkfs(path, fstype, strict=False, label=None, uuid=None, force=False,
return uuid
-def mkfs_from_config(path, info, strict=False):
+def mkfs_from_action(path, format, strict=False):
"""Make filesystem on block device with given path according to storage
config given"""
- fstype = info.get('fstype')
- if fstype is None:
- raise ValueError("fstype must be specified")
# NOTE: Since old metadata on partitions that have not been wiped can cause
# some mkfs commands to refuse to work, it's best to use force=True
- mkfs(path, fstype, strict=strict, force=True, uuid=info.get('uuid'),
- label=info.get('label'), extra_options=info.get('extra_options'))
+ mkfs(path, format.fstype, strict=strict, force=True, uuid=format.uuid,
+ label=format.label, extra_options=format.extra_options)
# vi: ts=4 expandtab syntax=python
diff --git a/curtin/commands/block_meta.py b/curtin/commands/block_meta.py
index 11972b9..fc61e21 100644
--- a/curtin/commands/block_meta.py
+++ b/curtin/commands/block_meta.py
@@ -3,8 +3,9 @@
from collections import OrderedDict, namedtuple
from curtin import (block, compat, config, paths, storage_actions, util)
from curtin.block import schemas
-from curtin.block import (bcache, clear_holders, dasd, iscsi, lvm, mdadm, mkfs,
+from curtin.block import (bcache, clear_holders, iscsi, lvm, mdadm, mkfs,
multipath, zfs)
+from curtin.block.dasd import DasdDevice, DasdPartitionTable
from curtin import distro
from curtin.log import LOG, logged_time
from curtin.reporter import events
@@ -23,6 +24,8 @@ from curtin.udev import (
udevadm_trigger,
)
+import attr
+
import glob
import json
import os
@@ -31,6 +34,7 @@ import string
import sys
import tempfile
import time
+from typing import List, Optional
FstabData = namedtuple(
@@ -471,7 +475,7 @@ def v1_get_path_to_disk(vol):
volume_path = '/dev/mapper/' + (
multipath.get_mpath_id_from_device(volume_path))
elif disk_key == 'device_id':
- dasd_device = dasd.DasdDevice(vol_value)
+ dasd_device = DasdDevice(vol_value)
volume_path = dasd_device.devname
except ValueError:
continue
@@ -533,7 +537,7 @@ def v2_get_path_to_disk(vol):
*disk_by_keys(
vol['serial'], 'DM_SERIAL', 'ID_SERIAL', 'ID_SERIAL_SHORT'))
if 'device_id' in vol:
- dasd_device = dasd.DasdDevice(vol['device_id'])
+ dasd_device = DasdDevice(vol['device_id'])
cands.append(set([dasd_device.devname]))
volume_path = dasd_device.devname
# verify path exists otherwise try the next key
@@ -667,11 +671,6 @@ class Image:
sector_size: int = storage_actions.size(default=512)
-@storage_actions.define("device")
-class Device:
- path: str
-
-
def image_handler(info, storage_config, context):
image: Image = storage_actions.asobject(info)
path = image.path
@@ -709,12 +708,28 @@ def image_handler(info, storage_config, context):
context.handlers['disk'](info, storage_config, context)
+@storage_actions.define("device")
+class Device:
+ path: str
+
+
def device_handler(info, storage_config, context):
device: Device = storage_actions.asobject(info)
context.id_to_device[device.id] = device.path
context.handlers['disk'](info, storage_config, context)
+@storage_actions.define("dasd")
+class Dasd:
+ device_id: str
+ blocksize: int
+ label: Optional[str] = None
+ mode: str
+ disk_layout: str
+ wipe: bool = False
+ preserve: bool = False
+
+
def dasd_handler(info, storage_config, context):
""" Prepare the specified dasd device per configuration
@@ -733,92 +748,103 @@ def dasd_handler(info, storage_config, context):
'disk_layout': 'cdl',
}
"""
- device_id = info.get('device_id')
- blocksize = info.get('blocksize')
- disk_layout = info.get('disk_layout')
- label = info.get('label')
- mode = info.get('mode')
- force_format = config.value_as_boolean(info.get('wipe'))
-
- dasd_device = dasd.DasdDevice(device_id)
- if (force_format or dasd_device.needs_formatting(blocksize,
- disk_layout, label)):
- if config.value_as_boolean(info.get('preserve')):
+ dasd: Dasd = storage_actions.asobject(info)
+ force_format = config.value_as_boolean(dasd.wipe)
+
+ dasd_device = DasdDevice(dasd.device_id)
+ if (force_format or dasd_device.needs_formatting(dasd.blocksize,
+ dasd.disk_layout,
+ dasd.label)):
+ if config.value_as_boolean(dasd.preserve):
raise ValueError(
"dasd '%s' does not match configured properties and"
"preserve is set to true. The dasd needs formatting"
- "with the specified parameters to continue." % info.get('id'))
+ "with the specified parameters to continue." % dasd.id)
LOG.debug('Formatting dasd id=%s device_id=%s devname=%s',
- info.get('id'), device_id, dasd_device.devname)
- dasd_device.format(blksize=blocksize, layout=disk_layout,
- set_label=label, mode=mode)
+ dasd.id, dasd.device_id, dasd_device.devname)
+ dasd_device.format(blksize=dasd.blocksize, layout=dasd.disk_layout,
+ set_label=dasd.label, mode=dasd.mode)
# check post-format to ensure values match
- if dasd_device.needs_formatting(blocksize, disk_layout, label):
+ if dasd_device.needs_formatting(
+ dasd.blocksize, dasd.disk_layout, dasd.label):
raise RuntimeError(
"Dasd %s failed to format" % dasd_device.devname)
+@attr.s(auto_attribs=True)
+class _Partitionable:
+ ptable: Optional[str] = None
+ preserve: bool = storage_actions.boolean(default=False)
+ wipe: Optional[str] = None
+ name: Optional[str] = None
+
+
+@storage_actions.define("disk")
+class Disk(_Partitionable):
+ pass
+
+
def disk_handler(info, storage_config, context):
+ disk: _Partitionable = storage_actions.asobject(info)
_dos_names = ['dos', 'msdos']
- ptable = info.get('ptable')
- if ptable and ptable not in PTABLES_VALID:
+ if disk.ptable and disk.ptable not in PTABLES_VALID:
raise ValueError(
- 'Invalid partition table type: %s in %s' % (ptable, info))
+ 'Invalid partition table type: %s in %s' % (disk.ptable, disk))
- disk = get_path_to_storage_volume(info.get('id'), storage_config)
- context.id_to_device[info['id']] = disk
+ disk_path = get_path_to_storage_volume(disk.id, storage_config)
+ context.id_to_device[disk.id] = disk_path
# For disks, 'preserve' is what indicates whether the partition
# table should be reused or recreated but for compound devices
# such as raids, it indicates if the raid should be created or
# assumed to already exist. So to allow a pre-existing raid to get
# a new partition table, we use presence of 'wipe' field to
# indicate if the disk should be reformatted or not.
- if info['type'] == 'disk':
- preserve_ptable = config.value_as_boolean(info.get('preserve'))
+ if disk.type == 'disk':
+ preserve_ptable = disk.preserve
else:
- preserve_ptable = config.value_as_boolean(info.get('preserve')) \
- and not config.value_as_boolean(info.get('wipe'))
+ preserve_ptable = disk.preserve and not disk.wipe
if preserve_ptable:
# Handle preserve flag, verifying if ptable specified in config
- if ptable and ptable != PTABLE_UNSUPPORTED:
- current_ptable = block.get_part_table_type(disk)
+ if disk.ptable and disk.ptable != PTABLE_UNSUPPORTED:
+ current_ptable = block.get_part_table_type(disk_path)
LOG.debug('disk: current ptable type: %s', current_ptable)
if current_ptable not in PTABLES_SUPPORTED:
raise ValueError(
"disk '%s' does not have correct partition table or "
"cannot be read, but preserve is set to true (or wipe is "
"not set). cannot continue installation." %
- info.get('id'))
+ disk.id)
LOG.info("disk '%s' marked to be preserved, so keeping partition "
- "table" % disk)
+ "table" % disk.id)
else:
# wipe the disk and create the partition table if instructed to do so
- if config.value_as_boolean(info.get('wipe')):
- block.wipe_volume(disk, mode=info.get('wipe'))
- if config.value_as_boolean(ptable):
- LOG.info("labeling device: '%s' with '%s' partition table", disk,
- ptable)
- if ptable == "gpt":
+ if config.value_as_boolean(disk.wipe):
+ block.wipe_volume(disk_path, mode=disk.wipe)
+ if disk.ptable is not None:
+ LOG.info(
+ "labeling device: '%s' with '%s' partition table", disk_path,
+ disk.ptable)
+ if disk.ptable == "gpt":
# Wipe both MBR and GPT that may be present on the disk.
# N.B.: wipe_volume wipes 1M at front and end of the disk.
# This could destroy disk data in filesystems that lived
# there.
- block.wipe_volume(disk, mode='superblock')
- elif ptable in _dos_names:
- util.subp(["parted", disk, "--script", "mklabel", "msdos"])
- elif ptable == "vtoc":
- util.subp(["fdasd", "-c", "/dev/null", disk])
- holders = clear_holders.get_holders(disk)
+ block.wipe_volume(disk_path, mode='superblock')
+ elif disk.ptable in _dos_names:
+ util.subp(["parted", disk_path, "--script", "mklabel", "msdos"])
+ elif disk.ptable == "vtoc":
+ util.subp(["fdasd", "-c", "/dev/null", disk_path])
+ holders = clear_holders.get_holders(disk_path)
if len(holders) > 0:
LOG.info('Detected block holders on disk %s: %s', disk, holders)
- clear_holders.clear_holders(disk)
- clear_holders.assert_clear(disk)
+ clear_holders.clear_holders(disk_path)
+ clear_holders.assert_clear(disk_path)
# Make the name if needed
- if info.get('name'):
- make_dname(info.get('id'), storage_config)
+ if disk.name:
+ make_dname(disk.id, storage_config)
def getnumberoflogicaldisks(device, storage_config):
@@ -969,27 +995,26 @@ def verify_ptable_flag(devpath, expected_flag, label, part_info):
raise RuntimeError(msg)
-def partition_verify_sfdisk(part_action, label, sfdisk_part_info):
+def partition_verify_sfdisk(partition: "Partition", label, sfdisk_part_info):
devpath = os.path.realpath(sfdisk_part_info['node'])
verify_size(
- devpath, int(util.human2bytes(part_action['size'])), sfdisk_part_info)
- expected_flag = part_action.get('flag')
- if expected_flag:
- verify_ptable_flag(devpath, expected_flag, label, sfdisk_part_info)
+ devpath, partition.size, sfdisk_part_info)
+ if partition.flag:
+ verify_ptable_flag(devpath, partition.flag, label, sfdisk_part_info)
-def partition_verify_fdasd(disk_path, partnumber, info):
+def partition_verify_fdasd(disk_path, partnumber, partition: "Partition"):
verify_exists(disk_path)
- pt = dasd.DasdPartitionTable.from_fdasd(disk_path)
+ pt = DasdPartitionTable.from_fdasd(disk_path)
pt_entry = pt.partitions[partnumber-1]
- expected_tracks = pt.tracks_needed(util.human2bytes(info['size']))
+ expected_tracks = pt.tracks_needed(partition.size)
msg = (
'Verifying %s part %s size, expecting %s tracks, found %s tracks' % (
disk_path, partnumber, expected_tracks, pt_entry.length))
LOG.debug(msg)
if expected_tracks != pt_entry.length:
raise RuntimeError(msg)
- if info.get('flag', 'linux') != 'linux':
+ if partition.flag not in [None, 'linux']:
raise RuntimeError("dasd partitions do not support flags")
@@ -1002,23 +1027,28 @@ def check_passed_path(info, actual_path):
info["type"], info, info['path'], actual_path))
+@storage_actions.define("partition")
+class Partition:
+ device: str
+ size: int = storage_actions.size()
+ flag: Optional[str] = None
+ path: Optional[str] = None
+ preserve: bool = storage_actions.boolean(default=False)
+ wipe: Optional[str] = None
+
+
def partition_handler(info, storage_config, context):
- device = info.get('device')
- size = info.get('size')
- flag = info.get('flag')
- disk_ptable = storage_config.get(device).get('ptable')
+ partition: Partition = storage_actions.asobject(info)
+ disk_ptable = storage_config.get(partition.device).get('ptable')
partition_type = None
- if not device:
- raise ValueError("device must be set for partition to be created")
- if not size:
- raise ValueError("size must be specified for partition to be created")
- disk = get_path_to_storage_volume(device, storage_config)
- partnumber = determine_partition_number(info.get('id'), storage_config)
+ disk = get_path_to_storage_volume(partition.device, storage_config)
+ partnumber = determine_partition_number(partition.id, storage_config)
disk_kname = block.path_to_kname(disk)
part_path = block.dev_path(block.partition_kname(disk_kname, partnumber))
- check_passed_path(info, part_path)
- context.id_to_device[info['id']] = part_path
+ if partition.path is not None:
+ check_passed_path({'path': partition.path}, part_path)
+ context.id_to_device[partition.id] = part_path
# consider the disks logical sector size when calculating sectors
try:
@@ -1032,16 +1062,18 @@ def partition_handler(info, storage_config, context):
if partnumber > 1:
pnum = None
if partnumber == 5 and disk_ptable == "msdos":
- extended_part_id = find_extended_partition(device, storage_config)
+ extended_part_id = find_extended_partition(
+ partition.device, storage_config)
if not extended_part_id:
msg = ("Logical partition id=%s requires an extended partition"
" and no extended partition '(type: partition, flag: "
"extended)' was found in the storage config.")
- LOG.error(msg, info['id'])
- raise RuntimeError(msg, info['id'])
+ LOG.error(msg, partition.id)
+ raise RuntimeError(msg, partition.id)
pnum = determine_partition_number(extended_part_id, storage_config)
else:
- pnum = find_previous_partition(device, info['id'], storage_config)
+ pnum = find_previous_partition(
+ partition.device, partition.id, storage_config)
# In case we fail to find previous partition let's error out now
if pnum is None:
@@ -1049,7 +1081,7 @@ def partition_handler(info, storage_config, context):
'Cannot find previous partition on disk %s' % disk)
LOG.debug("previous partition number for '%s' found to be '%s'",
- info.get('id'), pnum)
+ partition.id, pnum)
partition_kname = block.partition_kname(disk_kname, pnum)
LOG.debug('partition_kname=%s', partition_kname)
(previous_start_sectors, previous_size_sectors) = (
@@ -1062,12 +1094,12 @@ def partition_handler(info, storage_config, context):
offset_sectors = alignment_offset
else:
# further partitions
- if disk_ptable == "gpt" or flag != "logical":
+ if disk_ptable == "gpt" or partition.flag != "logical":
# msdos primary and any gpt part start after former partition end
offset_sectors = previous_start_sectors + previous_size_sectors
else:
# msdos extended/logical partitions
- if flag == "logical":
+ if partition.flag == "logical":
if partnumber == 5:
# First logical partition
# start at extended partition start + alignment_offset
@@ -1080,26 +1112,25 @@ def partition_handler(info, storage_config, context):
previous_size_sectors +
alignment_offset)
- length_bytes = util.human2bytes(size)
# start sector is part of the sectors that define the partitions size
# so length has to be "size in sectors - 1"
- length_sectors = int(length_bytes / logical_block_size_bytes) - 1
+ length_sectors = int(partition.size / logical_block_size_bytes) - 1
# logical partitions can't share their start sector with the extended
# partition and logical partitions can't go head-to-head, so we have to
# realign and for that increase size as required
- if info.get('flag') == "extended":
- logdisks = getnumberoflogicaldisks(device, storage_config)
+ if partition.flag == "extended":
+ logdisks = getnumberoflogicaldisks(partition.device, storage_config)
length_sectors = length_sectors + (logdisks * alignment_offset)
# Handle preserve flag
create_partition = True
- if config.value_as_boolean(info.get('preserve')):
+ if partition.preserve:
if disk_ptable == 'vtoc':
- partition_verify_fdasd(disk, partnumber, info)
+ partition_verify_fdasd(disk, partnumber, partition)
else:
sfdisk_info = block.sfdisk_info(disk)
part_info = block.get_partition_sfdisk_info(part_path, sfdisk_info)
- partition_verify_sfdisk(info, sfdisk_info['label'], part_info)
+ partition_verify_sfdisk(partition, sfdisk_info['label'], part_info)
LOG.debug(
'%s partition %s already present, skipping create',
disk, partnumber)
@@ -1109,17 +1140,17 @@ def partition_handler(info, storage_config, context):
# Set flag
# 'sgdisk --list-types'
LOG.info("adding partition '%s' to disk '%s' (ptable: '%s')",
- info.get('id'), device, disk_ptable)
+ partition.id, partition.device, disk_ptable)
LOG.debug("partnum: %s offset_sectors: %s length_sectors: %s",
partnumber, offset_sectors, length_sectors)
# Pre-Wipe the partition if told to do so, do not wipe dos extended
# partitions as this may damage the extended partition table
- if config.value_as_boolean(info.get('wipe')):
+ if config.value_as_boolean(partition.wipe):
LOG.info("Preparing partition location on disk %s", disk)
- if info.get('flag') == "extended":
+ if partition.flag == "extended":
LOG.warn("extended partitions do not need wiping, "
- "so skipping: '%s'" % info.get('id'))
+ "so skipping: '%s'" % partition.id)
else:
# wipe the start of the new partition first by zeroing 1M at
# the length of the previous partition
@@ -1132,26 +1163,26 @@ def partition_handler(info, storage_config, context):
exclusive=False)
if disk_ptable == "msdos":
- if flag and flag == 'prep':
+ if partition.flag == 'prep':
raise ValueError(
'PReP partitions require a GPT partition table')
- if flag in ["extended", "logical", "primary"]:
- partition_type = flag
+ if partition.flag in ["extended", "logical", "primary"]:
+ partition_type = partition.flag
else:
partition_type = "primary"
cmd = ["parted", disk, "--script", "mkpart", partition_type]
- if flag == 'swap':
+ if partition.flag == 'swap':
cmd.append("linux-swap")
cmd.append("%ss" % offset_sectors)
cmd.append("%ss" % (offset_sectors + length_sectors))
- if flag == 'boot':
+ if partition.flag == 'boot':
cmd.extend(['set', str(partnumber), 'boot', 'on'])
util.subp(cmd, capture=True)
elif disk_ptable == "gpt":
- if flag and flag in SGDISK_FLAGS:
- typecode = SGDISK_FLAGS[flag]
+ if partition.flag in SGDISK_FLAGS:
+ typecode = SGDISK_FLAGS[partition.flag]
else:
typecode = SGDISK_FLAGS['linux']
cmd = ["sgdisk", "--new", "%s:%s:%s" % (partnumber, offset_sectors,
@@ -1159,8 +1190,8 @@ def partition_handler(info, storage_config, context):
"--typecode=%s:%s" % (partnumber, typecode), disk]
util.subp(cmd, capture=True)
elif disk_ptable == "vtoc":
- dasd_pt = dasd.DasdPartitionTable.from_fdasd(disk)
- dasd_pt.add_partition(partnumber, length_bytes)
+ dasd_pt = DasdPartitionTable.from_fdasd(disk)
+ dasd_pt.add_partition(partnumber, partition.size)
else:
raise ValueError("parent partition has invalid partition table")
@@ -1180,7 +1211,7 @@ def partition_handler(info, storage_config, context):
block.rescan_block_devices([disk])
udevadm_settle(exists=part_path)
- wipe_mode = info.get('wipe')
+ wipe_mode = partition.wipe
if wipe_mode:
if wipe_mode == 'superblock' and create_partition:
# partition creation pre-wipes partition superblock locations
@@ -1190,29 +1221,37 @@ def partition_handler(info, storage_config, context):
block.wipe_volume(part_path, mode=wipe_mode, exclusive=False)
# Make the name if needed
- if storage_config.get(device).get('name') and partition_type != 'extended':
- make_dname(info.get('id'), storage_config)
+ if storage_config.get(partition.device).get('name') and \
+ partition_type != 'extended':
+ make_dname(partition.id, storage_config)
+
+
+@storage_actions.define("format")
+class Format:
+ volume: str
+ preserve: bool = storage_actions.boolean(default=False)
+ fstype: str
+ label: Optional[str] = None
+ uuid: Optional[str] = None
+ extra_options: Optional[List[str]] = None
def format_handler(info, storage_config, context):
- volume = info.get('volume')
- if not volume:
- raise ValueError("volume must be specified for partition '%s'" %
- info.get('id'))
+ format: Format = storage_actions.asobject(info)
# Get path to volume
- volume_path = get_path_to_storage_volume(volume, storage_config)
+ volume_path = get_path_to_storage_volume(format.volume, storage_config)
# Handle preserve flag
- if config.value_as_boolean(info.get('preserve')):
+ if format.preserve:
# Volume marked to be preserved, not formatting
return
# Make filesystem using block library
- LOG.debug("mkfs %s info: %s", volume_path, info)
- mkfs.mkfs_from_config(volume_path, info)
+ LOG.debug("mkfs %s info: %s", volume_path, format)
+ mkfs.mkfs_from_action(volume_path, format)
- device_type = storage_config.get(volume).get('type')
+ device_type = storage_config.get(format.volume)['type']
LOG.debug('Formated device type: %s', device_type)
if device_type == 'bcache':
# other devs have a udev watch on them. Not bcache (LP: #1680597).
diff --git a/curtin/storage_actions.py b/curtin/storage_actions.py
index b2a6fd4..d112645 100644
--- a/curtin/storage_actions.py
+++ b/curtin/storage_actions.py
@@ -29,5 +29,9 @@ def asobject(obj):
return config.fromdict(cls, obj)
+def boolean(*, default=attr.NOTHING):
+ return attr.ib(converter=config.value_as_boolean, default=default)
+
+
def size(*, default=attr.NOTHING):
return attr.ib(converter=_convert_size, default=default)
diff --git a/tests/unittests/test_block_mkfs.py b/tests/unittests/test_block_mkfs.py
index 163cee6..eb0ac9f 100644
--- a/tests/unittests/test_block_mkfs.py
+++ b/tests/unittests/test_block_mkfs.py
@@ -1,18 +1,27 @@
# This file is part of curtin. See LICENSE file for copyright and license info.
from curtin.block import mkfs
+from curtin.storage_actions import asobject
from .helpers import CiTestCase
from unittest import mock
+__import__("curtin.commands.block_meta")
+
+
class TestBlockMkfs(CiTestCase):
test_uuid = "fb26cc6c-ae73-11e5-9e38-2fb63f0c3155"
- def _get_config(self, fstype):
- return {"fstype": fstype, "type": "format", "id": "testfmt",
- "volume": "null", "label": "format1",
- "uuid": self.test_uuid}
+ def _get_action(self, fstype):
+ return asobject({
+ "fstype": fstype,
+ "type": "format",
+ "id": "testfmt",
+ "volume": "null",
+ "label": "format1",
+ "uuid": self.test_uuid,
+ })
def _assert_same_flags(self, call, expected):
print("call:\n{}".format(call))
@@ -38,7 +47,7 @@ class TestBlockMkfs(CiTestCase):
@mock.patch("curtin.block.mkfs.os")
@mock.patch("curtin.block.mkfs.util")
@mock.patch("curtin.block.mkfs.distro.lsb_release")
- def _run_mkfs_with_config(self, config, expected_cmd, expected_flags,
+ def _run_mkfs_with_action(self, action, expected_cmd, expected_flags,
mock_lsb_release, mock_util, mock_os, mock_block,
release="wily", strict=False):
# Pretend we are on wily as there are no known edge cases for it
@@ -46,7 +55,7 @@ class TestBlockMkfs(CiTestCase):
mock_os.path.exists.return_value = True
mock_block.get_blockdev_sector_size.return_value = (512, 512)
- mkfs.mkfs_from_config("/dev/null", config, strict=strict)
+ mkfs.mkfs_from_action("/dev/null", action, strict=strict)
self.assertTrue(mock_util.subp.called)
calls = mock_util.subp.call_args_list
self.assertEquals(len(calls), 1)
@@ -58,92 +67,83 @@ class TestBlockMkfs(CiTestCase):
self._assert_same_flags(call, expected_flags)
def test_mkfs_ext(self):
- conf = self._get_config("ext4")
+ fmt = self._get_action("ext4")
expected_flags = [["-L", "format1"], "-F",
["-U", self.test_uuid]]
- self._run_mkfs_with_config(conf, "mkfs.ext4", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.ext4", expected_flags)
def test_mkfs_ext_with_extra_options(self):
- conf = self._get_config("ext4")
+ fmt = self._get_action("ext4")
extra_options = ["-D", "-e", "continue"
"-E", "offset=1024,nodiscard,resize=10"]
- conf['extra_options'] = extra_options
+ fmt.extra_options = extra_options
expected_flags = [["-L", "format1"], "-F",
["-U", self.test_uuid]] + extra_options
- self._run_mkfs_with_config(conf, "mkfs.ext4", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.ext4", expected_flags)
def test_mkfs_f2fs(self):
- conf = self._get_config("f2fs")
+ fmt = self._get_action("f2fs")
expected_flags = [["-l", "format1"], "-f",
["-U", self.test_uuid]]
- self._run_mkfs_with_config(conf, "mkfs.f2fs", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.f2fs", expected_flags)
def test_mkfs_btrfs(self):
- conf = self._get_config("btrfs")
+ fmt = self._get_action("btrfs")
expected_flags = [["--label", "format1"], "--force",
["--uuid", self.test_uuid]]
- self._run_mkfs_with_config(conf, "mkfs.btrfs", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.btrfs", expected_flags)
def test_mkfs_xfs(self):
""" mkfs.xfs passes uuid parameter """
- conf = self._get_config("xfs")
+ fmt = self._get_action("xfs")
expected_flags = ['-f', ['-L', 'format1'],
['-m', 'uuid=%s' % self.test_uuid]]
- self._run_mkfs_with_config(conf, "mkfs.xfs", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.xfs", expected_flags)
def test_mkfs_btrfs_on_precise(self):
# Test precise+btrfs where there is no force or uuid
- conf = self._get_config("btrfs")
+ fmt = self._get_action("btrfs")
expected_flags = [["--label", "format1"]]
- self._run_mkfs_with_config(conf, "mkfs.btrfs", expected_flags,
+ self._run_mkfs_with_action(fmt, "mkfs.btrfs", expected_flags,
release="precise")
def test_mkfs_btrfs_on_trusty(self):
# Test trusty btrfs where there is no uuid
- conf = self._get_config("btrfs")
+ fmt = self._get_action("btrfs")
expected_flags = [["--label", "format1"], "--force"]
- self._run_mkfs_with_config(conf, "mkfs.btrfs", expected_flags,
+ self._run_mkfs_with_action(fmt, "mkfs.btrfs", expected_flags,
release="trusty")
def test_mkfs_fat(self):
- conf = self._get_config("fat32")
+ fmt = self._get_action("fat32")
expected_flags = ["-I", ["-n", "format1"], ["-F", "32"]]
- self._run_mkfs_with_config(conf, "mkfs.vfat", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkfs.vfat", expected_flags)
def test_mkfs_vfat(self):
"""Ensure we can use vfat without fatsize"""
- conf = self._get_config("vfat")
+ fmt = self._get_action("vfat")
expected_flags = ["-I", ["-n", "format1"], ]
- self._run_mkfs_with_config(conf, "mkfs.vfat", expected_flags)
-
- def test_mkfs_invalid_fstype(self):
- """Do not proceed if fstype is None or invalid"""
- with self.assertRaises(ValueError):
- conf = self._get_config(None)
- self._run_mkfs_with_config(conf, "mkfs.ext4", [])
- with self.assertRaises(ValueError):
- conf = self._get_config("fakefilesystemtype")
- self._run_mkfs_with_config(conf, "mkfs.ext3", [])
+ self._run_mkfs_with_action(fmt, "mkfs.vfat", expected_flags)
def test_mkfs_invalid_label(self):
"""Do not proceed if filesystem label is too long"""
with self.assertRaises(ValueError):
- conf = self._get_config("ext4")
- conf['label'] = "thislabelislongerthan16chars"
- self._run_mkfs_with_config(conf, "mkfs.ext4", [], strict=True)
+ fmt = self._get_action("ext4")
+ fmt.label = "thislabelislongerthan16chars"
+ self._run_mkfs_with_action(fmt, "mkfs.ext4", [], strict=True)
- conf = self._get_config("swap")
+ fmt = self._get_action("swap")
expected_flags = ["--force", ["--label", "abcdefghijklmno"],
- ["--uuid", conf['uuid']]]
- conf['label'] = "abcdefghijklmnop" # 16 chars, 15 is max
+ ["--uuid", fmt.uuid]]
+ fmt.label = "abcdefghijklmnop" # 16 chars, 15 is max
# Raise error, do not truncate with strict = True
with self.assertRaises(ValueError):
- self._run_mkfs_with_config(conf, "mkswap", expected_flags,
+ self._run_mkfs_with_action(fmt, "mkswap", expected_flags,
strict=True)
# Do not raise with strict = False
- self._run_mkfs_with_config(conf, "mkswap", expected_flags)
+ self._run_mkfs_with_action(fmt, "mkswap", expected_flags)
@mock.patch("curtin.block.mkfs.block")
@mock.patch("curtin.block.mkfs.util")
diff --git a/tests/unittests/test_commands_block_meta.py b/tests/unittests/test_commands_block_meta.py
index 6388e0b..85ffcc2 100644
--- a/tests/unittests/test_commands_block_meta.py
+++ b/tests/unittests/test_commands_block_meta.py
@@ -14,7 +14,7 @@ import uuid
from curtin.block import dasd
from curtin.commands import block_meta, block_meta_v2
-from curtin import paths, util
+from curtin import paths, storage_actions, util
from .helpers import CiTestCase
@@ -1623,9 +1623,9 @@ class TestFstabVolumeSpec(CiTestCase):
class TestDasdHandler(CiTestCase):
- @patch('curtin.commands.block_meta.dasd.DasdDevice.devname')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.format')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.needs_formatting')
+ @patch('curtin.commands.block_meta.DasdDevice.devname')
+ @patch('curtin.commands.block_meta.DasdDevice.format')
+ @patch('curtin.commands.block_meta.DasdDevice.needs_formatting')
@patch('curtin.commands.block_meta.block')
@patch('curtin.commands.block_meta.util')
@patch('curtin.commands.block_meta.get_path_to_storage_volume')
@@ -1647,8 +1647,8 @@ class TestDasdHandler(CiTestCase):
set_label='cloudimg-rootfs',
mode='quick')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.format')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.needs_formatting')
+ @patch('curtin.commands.block_meta.DasdDevice.format')
+ @patch('curtin.commands.block_meta.DasdDevice.needs_formatting')
@patch('curtin.commands.block_meta.block')
@patch('curtin.commands.block_meta.util')
@patch('curtin.commands.block_meta.get_path_to_storage_volume')
@@ -1667,8 +1667,8 @@ class TestDasdHandler(CiTestCase):
block_meta.dasd_handler(info, storage_config, empty_context)
self.assertEqual(0, m_dasd_format.call_count)
- @patch('curtin.commands.block_meta.dasd.DasdDevice.format')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.needs_formatting')
+ @patch('curtin.commands.block_meta.DasdDevice.format')
+ @patch('curtin.commands.block_meta.DasdDevice.needs_formatting')
@patch('curtin.commands.block_meta.block')
@patch('curtin.commands.block_meta.util')
@patch('curtin.commands.block_meta.get_path_to_storage_volume')
@@ -1688,8 +1688,8 @@ class TestDasdHandler(CiTestCase):
self.assertEqual(1, m_dasd_needf.call_count)
self.assertEqual(0, m_dasd_format.call_count)
- @patch('curtin.commands.block_meta.dasd.DasdDevice.format')
- @patch('curtin.commands.block_meta.dasd.DasdDevice.needs_formatting')
+ @patch('curtin.commands.block_meta.DasdDevice.format')
+ @patch('curtin.commands.block_meta.DasdDevice.needs_formatting')
@patch('curtin.commands.block_meta.block')
@patch('curtin.commands.block_meta.util')
@patch('curtin.commands.block_meta.get_path_to_storage_volume')
@@ -2749,7 +2749,8 @@ class TestPartitionHandler(CiTestCase):
block_meta.partition_handler(sconfig[1], oconfig, empty_context)
- m_verify_fdasd.assert_has_calls([call(devpath, 1, sconfig[1])])
+ m_verify_fdasd.assert_has_calls(
+ [call(devpath, 1, storage_actions.asobject(sconfig[1]))])
class TestMultipathPartitionHandler(CiTestCase):
@@ -2973,15 +2974,14 @@ class TestPartitionVerifySfdisk(CiTestCase):
self.add_patch(base + 'verify_ptable_flag', 'm_verify_ptable_flag')
self.add_patch(base + 'os.path.realpath', 'm_realpath')
self.m_realpath.side_effect = lambda x: x
- self.info = {
+ self.part_action = storage_actions.asobject({
'id': 'disk-sda-part-2',
'type': 'partition',
'device': 'sda',
'number': 2,
'size': '5GB',
'flag': 'boot',
- }
- self.part_size = int(util.human2bytes(self.info['size']))
+ })
self.devpath = self.random_string()
def test_partition_verify_sfdisk(self):
@@ -2990,24 +2990,24 @@ class TestPartitionVerifySfdisk(CiTestCase):
'node': devpath,
}
label = self.random_string()
- block_meta.partition_verify_sfdisk(self.info, label, sfdisk_part_info)
+ block_meta.partition_verify_sfdisk(self.part_action, label, sfdisk_part_info)
self.assertEqual(
- [call(devpath, self.part_size, sfdisk_part_info)],
+ [call(devpath, self.part_action.size, sfdisk_part_info)],
self.m_verify_size.call_args_list)
self.assertEqual(
- [call(devpath, self.info['flag'], label, sfdisk_part_info)],
+ [call(devpath, self.part_action.flag, label, sfdisk_part_info)],
self.m_verify_ptable_flag.call_args_list)
def test_partition_verify_skips_ptable_no_flag(self):
- del self.info['flag']
+ self.part_action.flag = None
devpath = self.random_string()
sfdisk_part_info = {
'node': devpath,
}
label = self.random_string()
- block_meta.partition_verify_sfdisk(self.info, label, sfdisk_part_info)
+ block_meta.partition_verify_sfdisk(self.part_action, label, sfdisk_part_info)
self.assertEqual(
- [call(devpath, self.part_size, sfdisk_part_info)],
+ [call(devpath, self.part_action.size, sfdisk_part_info)],
self.m_verify_size.call_args_list)
self.assertEqual([], self.m_verify_ptable_flag.call_args_list)
@@ -3479,7 +3479,7 @@ class TestPartitionVerifyFdasd(CiTestCase):
base = 'curtin.commands.block_meta.'
self.add_patch(base + 'verify_exists', 'm_verify_exists')
self.add_patch(
- base + 'dasd.DasdPartitionTable.from_fdasd', 'm_from_fdasd',
+ base + 'DasdPartitionTable.from_fdasd', 'm_from_fdasd',
autospec=False, spec=dasd.DasdPartitionTable.from_fdasd)
self.add_patch(base + 'verify_size', 'm_verify_size')
self.info = {
@@ -3504,7 +3504,8 @@ class TestPartitionVerifyFdasd(CiTestCase):
dasd.DasdPartition('', 0, 0, tracks_needed, '1', 'Linux'),
]
self.m_from_fdasd.side_effect = iter([fake_pt])
- block_meta.partition_verify_fdasd(self.devpath, 1, self.info)
+ block_meta.partition_verify_fdasd(
+ self.devpath, 1, storage_actions.asobject(self.info))
self.assertEqual(
[call(self.devpath)],
self.m_verify_exists.call_args_list)
Follow ups