← Back to team overview

cloud-init-dev team mailing list archive

Re: [Merge] ~smoser/cloud-init:feature/ds-identify-test into cloud-init:master

 


Diff comments:

> diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
> new file mode 100644
> index 0000000..00f91f3
> --- /dev/null
> +++ b/tests/unittests/test_ds_identify.py
> @@ -0,0 +1,275 @@
> +# This file is part of cloud-init. See LICENSE file for license information.
> +
> +import copy
> +import os
> +
> +from cloudinit import safeyaml
> +from cloudinit import util
> +from .helpers import CiTestCase, dir2dict, populate_dir
> +
> +UNAME_MYSYS = ("Linux bart 4.4.0-62-generic #83-Ubuntu "
> +               "SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux")
> +BLKID_EFI_ROOT = """
> +DEVNAME=/dev/sda1
> +UUID=8B36-5390
> +TYPE=vfat
> +PARTUUID=30d7c715-a6ae-46ee-b050-afc6467fc452
> +
> +DEVNAME=/dev/sda2
> +UUID=19ac97d5-6973-4193-9a09-2e6bbfa38262
> +TYPE=ext4
> +PARTUUID=30c65c77-e07d-4039-b2fb-88b1fb5fa1fc
> +"""
> +
> +DI_DEFAULT_POLICY = "search,found=all,maybe=all,notfound=enabled"
> +DI_DEFAULT_POLICY_NO_DMI = "search,found=all,maybe=all,notfound=disabled"
> +
> +SHELL_MOCK_TMPL = """\
> +%(name)s_out='%(out)s'
> +%(name)s_err='%(err)s'
> +%(name)s_ret='%(ret)s'
> +%(name)s() {
> +   local out='%(out)s'
> +   local err='%(err)s'
> +   local ret='%(ret)s'
> +   [ "${%(name)s_out}" = "_unset" ] || echo "${%(name)s_out}"
> +   [ "${%(name)s_err}" = "_unset" ] || echo "${%(name)s_err}"
> +   return ${%(name)s_ret}
> +}
> +"""
> +
> +SHELL_MOCK_TMPL = """\
> +%(name)s() {
> +   local out='%(out)s' err='%(err)s' r='%(ret)s'
> +   [ "$out" = "_unset" ] || echo "$out"
> +   [ "$err" = "_unset" ] || echo "$err" 2>&1
> +   return $r
> +}
> +"""
> +
> +UUIDS = [
> +    'e7db1e51-3eff-4ce9-8bfd-e8749884034a',
> +    '4167a101-a2fc-4071-a078-f9d89b3c548d',
> +    '147c295d-f5f4-4d8b-8b90-14682a6ae072',
> +]
> +
> +RC_FOUND = 0
> +RC_NOT_FOUND = 1
> +
> +P_PRODUCT_NAME = "sys/class/dmi/id/product_name"
> +P_PRODUCT_SERIAL = "sys/class/dmi/id/product_serial"
> +P_PRODUCT_UUID = "sys/class/dmi/id/product_uuid"
> +P_DSID_CFG = "etc/cloud/ds-identify.cfg"
> +
> +
> +class TestDsIdentify(CiTestCase):
> +    dsid_path = os.path.realpath('tools/ds-identify')
> +
> +    def call(self, rootd=None, mocks=None, args=None, files=None,
> +             policy_dmi=DI_DEFAULT_POLICY,
> +             policy_nodmi=DI_DEFAULT_POLICY_NO_DMI):
> +        if args is None:
> +            args = []
> +        if mocks is None:
> +            mocks = []
> +
> +        if files is None:
> +            files = {}
> +
> +        if rootd is None:
> +            rootd = self.tmp_dir()
> +
> +        unset = '_unset'
> +        wrap = self.tmp_path(path="_shwrap", dir=rootd)
> +        populate_dir(rootd, files)
> +
> +        # DI_DEFAULT_POLICY* are declared always as to not rely
> +        # on the default in the code.  This is because SRU releases change
> +        # the value in the code, and thus tests would fail there.
> +        head = [
> +            "DI_MAIN=noop",
> +            "DEBUG_LEVEL=2",
> +            "DI_LOG=stderr",
> +            "PATH_ROOT='%s'" % rootd,
> +            ". " + self.dsid_path,
> +            'DI_DEFAULT_POLICY="%s"' % policy_dmi,
> +            'DI_DEFAULT_POLICY_NO_DMI="%s"' % policy_nodmi,
> +            ""
> +        ]
> +
> +        def write_mock(data):
> +            ddata = {'out': None, 'err': None, 'ret': 0}
> +            ddata.update(data)
> +            for k in ddata:
> +                if ddata[k] is None:
> +                    ddata[k] = unset
> +            return SHELL_MOCK_TMPL % ddata
> +
> +        mocklines = []
> +        defaults = [
> +            {'name': 'detect_virt', 'out': None, 'ret': 1},
> +            {'name': 'uname', 'out': UNAME_MYSYS},
> +            {'name': 'blkid', 'out': BLKID_EFI_ROOT},
> +        ]
> +
> +        written = [d['name'] for d in mocks]
> +        for data in mocks:
> +            mocklines.append(write_mock(data))
> +        for d in defaults:
> +            if d['name'] not in written:
> +                mocklines.append(write_mock(d))
> +
> +        endlines = [
> +            'main %s' % ' '.join(['"%s"' % s for s in args])
> +        ]
> +
> +        with open(wrap, "w") as fp:
> +            fp.write('\n'.join(head + mocklines + endlines) + "\n")
> +
> +        rc = 0
> +        try:
> +            out, err = util.subp(['sh', '-c', '. %s' % wrap], capture=True)
> +        except util.ProcessExecutionError as e:
> +            rc = e.exit_code
> +            out = e.stdout
> +            err = e.stderr
> +
> +        cfg = None
> +        cfg_out = os.path.join(rootd, 'run/cloud-init/cloud.cfg')
> +        if os.path.exists(cfg_out):
> +            cfg = safeyaml.load(util.load_file(cfg_out))
> +
> +        return rc, out, err, cfg, dir2dict(rootd)
> +
> +    def _call_via_dict(self, data, rootd=None):
> +        # return output of self.call with a dict input like VALID_CFG[item]
> +        kwargs = {'rootd': rootd}
> +        for k in ('mocks', 'args', 'policy_dmi', 'policy_nodmi', 'files'):
> +            if k in data:
> +                kwargs[k] = data[k]
> +        return self.call(**kwargs)
> +
> +    def _test_valid(self, name):
> +        rc, _out, _err, cfg, _files = self._call_via_dict(
> +            copy.deepcopy(VALID_CFG[name]))
> +        # print({k: v for k, v in _files.items() if k != "/_shwrap"})
> +        self.assertEqual(RC_FOUND, rc)
> +        self.assertEqual([VALID_CFG[name]['ds'], 'None'],
> +                         cfg['datasource_list'])
> +
> +    def test_aws_ec2_hvm(self):
> +        self._test_valid('Ec2-hvm')
> +
> +    def test_aws_ec2_xen(self):
> +        # ec2: sys/hypervisor/uuid starts with ec2
> +        self._test_valid('Ec2-xen')
> +
> +    def test_brightbox_is_ec2(self):
> +        # ec2: product_serial ends with brightbox.com
> +        self._test_valid('Ec2-brightbox')
> +
> +    def test_gce_by_product_name(self):
> +        # gce identifies itself with product_name
> +        self._test_valid('GCE')
> +
> +    def test_gce_by_serial(self):
> +        # older gce compute instances must be identified by serial
> +        self._test_valid('GCE-serial')
> +
> +    def test_config_drive(self):
> +        self._test_valid('ConfigDrive')
> +        return
> +
> +    def test_policy_disabled(self):
> +        tmp_dir = self.tmp_dir()
> +        rc, out, err, cfg, _ = self.call(tmp_dir, policy_dmi="disabled")
> +        self.assertEqual(RC_NOT_FOUND, 1)
> +
> +    def test_policy_config_disable_overrides_builtin(self):
> +        mydata = copy.deepcopy(VALID_CFG['Ec2-hvm'])
> +        mydata['files'][P_DSID_CFG] = '\n'.join(['policy: disabled', ''])
> +        rc, out, err, cfg, files = self._call_via_dict(mydata)
> +        self.assertEqual(RC_NOT_FOUND, 1)
> +
> +    def test_single_entry_defines_datasource(self):
> +        # if config has a single entry in datasource_list it is accepted.
> +
> +        # test the valid Ec2-hvm command with the datasource defined in config.
> +        mydata = copy.deepcopy(VALID_CFG['Ec2-hvm'])
> +        cfgpath = 'etc/cloud/cloud.cfg.d/myds.cfg'
> +        mydata['files'][cfgpath] = 'datasource_list: ["NoCloud"]\n'
> +        rc, out, err, cfg, files = self._call_via_dict(mydata)
> +        self.assertEqual(RC_FOUND, rc)
> +        self.assertEqual(['NoCloud', 'None'], cfg['datasource_list'])
> +
> +
> +def blkdev(dev, fs=None, label=None, ptuuid=None, uuid=None):
> +    # a shortcut for writing input to blkid_out.
> +    if not dev.startswith("/dev/"):
> +        dev = "/dev/" + dev
> +
> +    ret = {'DEVNAME': dev}
> +    maps = (('TYPE', fs), ('LABEL', label), ('PARTUUID', ptuuid),
> +            ('UUID', uuid))
> +    for k, v in maps:
> +        if v:
> +            ret[k] = v
> +
> +    return ret
> +
> +
> +def blkid_out(disks=None):
> +    if disks is None:
> +        disks = []

I withdraw the above comment as it's a small enough method it's easy to understand intent.

> +    lines = []
> +    fields = ("DEVNAME", "UUID", "UUID_SUB", "TYPE", "PARTUUID", "LABEL")
> +    for d in disks:
> +        for key in [f for f in fields if f in d]:
> +            lines.append("%s=%s" % (key, d[key]))
> +        lines.append("")
> +    return '\n'.join(lines)
> +
> +
> +VALID_CFG = {
> +    'Ec2-hvm': {
> +        'ds': 'Ec2',
> +        'mocks': [{'name': 'detect_virt', 'out': 'kvm', 'ret': 0}],
> +        'files': {
> +            P_PRODUCT_SERIAL: 'ec23aef5-54be-4843-8d24-8c819f88453e\n',
> +            P_PRODUCT_UUID: 'EC23AEF5-54BE-4843-8D24-8C819F88453E\n',
> +        }
> +    },
> +    'Ec2-xen': {
> +        'ds': 'Ec2',
> +        'mocks': [{'name': 'detect_virt', 'out': 'xen', 'ret': 0}],
> +        'files': {
> +            'sys/hypervisor/uuid': 'ec2c6e2f-5fac-4fc7-9c82-74127ec14bbb\n'
> +        },
> +    },
> +    'Ec2-brightbox': {
> +        'ds': 'Ec2',
> +        'files': {P_PRODUCT_SERIAL: 'facc6e2f.brightbox.com\n'},
> +    },
> +    'GCE': {
> +        'ds': 'GCE',
> +        'files': {P_PRODUCT_NAME: 'Google Compute Engine\n'},
> +    },
> +    'GCE-serial': {
> +        'ds': 'GCE',
> +        'files': {P_PRODUCT_SERIAL: 'GoogleCloud-8f2e88f\n'},
> +    },
> +    'ConfigDrive': {
> +        'ds': 'ConfigDrive',
> +        'mocks': [
> +            {'name': 'blkid', 'ret': 0,
> +             'out': blkid_out(
> +                 [blkdev('vda1', fs='vfat', ptuuid=UUIDS[0]),
> +                  blkdev('vda2', fs='ext4', label='cloudimg-rootfs',
> +                         ptuuid=UUIDS[1]),
> +                  blkdev('vdb', fs='vfat', label='config-2')])
> +             },
> +        ],
> +    },
> +}
> +
> +# vi: ts=4 expandtab


-- 
https://code.launchpad.net/~smoser/cloud-init/+git/cloud-init/+merge/323059
Your team cloud init development team is requested to review the proposed merge of ~smoser/cloud-init:feature/ds-identify-test into cloud-init:master.


References