← Back to team overview

cloud-init-dev team mailing list archive

Re: [Merge] ~powersj/cloud-init:cii-enable-ec2 into cloud-init:master

 

Looks great Josh thanks. I have some inline comments to resolve regarding using filters to find existing cloud-init resources in ec2 and using waiter methods. I've added a couple of discussion points about the potential of reusing or updating a named/tagged ssh key instead of use creating new ones. But, maybe we need to have that ssh discussion as a squad instead of on this branch.

Also a patch for better informing us about the required aws configuration files:
 http://paste.ubuntu.com/26190886/

Diff comments:

> diff --git a/doc/rtd/topics/tests.rst b/doc/rtd/topics/tests.rst
> index d668e3f..730df29 100644
> --- a/doc/rtd/topics/tests.rst
> +++ b/doc/rtd/topics/tests.rst
> @@ -392,6 +392,32 @@ Development Checklist
>             --test modules/your_test.yaml \
>             [--deb <build of cloud-init>]
>  
> +
> +Platforms
> +=========
> +
> +EC2
> +---
> +To run on the EC2 platform it is required that the user has an AWS credentials
> +configuration file specifying his or her access keys and a default region.
> +These configuration files are the standard that the AWS cli and other AWS
> +tools utilize for interacting directly with AWS itself and are normally
> +generated when running ``aws configure``:
> +
> +.. code-block:: bash
> +
> +    $ cat $HOME/.aws/config

this should be $HOME/.aws/credentials

> +    [default]
> +    aws_access_key_id = <KEY HERE>
> +    aws_secret_access_key = <KEY HERE>
> +
> +.. code-block:: bash
> +
> +    $ cat $HOME/.aws/config
> +    [default]
> +    region = us-west-2
> +
> +
>  Architecture
>  ============
>  
> diff --git a/tests/cloud_tests/platforms/ec2/image.py b/tests/cloud_tests/platforms/ec2/image.py
> new file mode 100644
> index 0000000..456dda2
> --- /dev/null
> +++ b/tests/cloud_tests/platforms/ec2/image.py
> @@ -0,0 +1,105 @@
> +# This file is part of cloud-init. See LICENSE file for license information.
> +
> +"""EC2 Image Base Class."""
> +
> +from ..images import Image
> +from .snapshot import EC2Snapshot
> +
> +
> +class EC2Image(Image):
> +    """EC2 backed image."""
> +
> +    platform_name = 'ec2'
> +
> +    def __init__(self, platform, config, image_ami):
> +        """Set up image.
> +
> +        @param platform: platform object
> +        @param config: image configuration
> +        @param image_ami: string of image ami ID
> +        """
> +        super(EC2Image, self).__init__(platform, config)
> +        self._img_instance = None
> +        self.image_ami = image_ami
> +        self.image_ami_edited = False
> +
> +    @property
> +    def _instance(self):
> +        """Internal use only, returns a running instance"""
> +        if not self._img_instance:
> +            self._img_instance = self.platform.create_instance(
> +                self.properties, self.config, self.features,
> +                self.image_ami, b'')

For clarity on this callsite, you could just use kwargs
src_img_path=self.image_ami, user_data=b'')

> +            self._img_instance.start(wait=True, wait_for_cloud_init=True)
> +            self.modified = True
> +        return self._img_instance
> +
> +    @property
> +    def properties(self):
> +        """Dictionary containing: 'arch', 'os', 'version', 'release'."""
> +        return {
> +            'arch': self.config['arch'],
> +            'os': self.config['family'],
> +            'release': self.config['release'],
> +            'version': self.config['version'],
> +        }
> +
> +    def destroy(self):
> +        """Unset path to signal image is no longer used.
> +
> +        The removal of the images and all other items is handled by the
> +        framework. In some cases we want to keep the images, so let the
> +        framework decide whether to keep or destroy everything.
> +        """
> +        if self.image_ami_edited:
> +            self.platform.ec2_client.deregister_image(
> +                ImageId=self.image_ami_edited
> +            )
> +
> +        super(EC2Image, self).destroy()
> +
> +    def _execute(self, *args, **kwargs):
> +        """Execute command in image, modifying image."""
> +        return self._instance._execute(*args, **kwargs)
> +
> +    def push_file(self, local_path, remote_path):
> +        """Copy file at 'local_path' to instance at 'remote_path'."""
> +        return self._instance.push_file(local_path, remote_path)
> +
> +    def run_script(self, *args, **kwargs):
> +        """Run script in image, modifying image.
> +
> +        @return_value: script output
> +        """
> +        return self._instance.run_script(*args, **kwargs)
> +
> +    def snapshot(self):
> +        """Create snapshot of image, block until done.
> +
> +        Will return base image_ami if no instance has been booted, otherwise
> +        will run the clean script, shutdown the instance, create a custom
> +        AMI, and use that AMI once available.
> +        """
> +        if not self._img_instance:
> +            return EC2Snapshot(self.platform, self.properties, self.config,
> +                               self.features, self.image_ami)
> +
> +        if self.config.get('boot_clean_script'):
> +            self._img_instance.run_script(self.config.get('boot_clean_script'))
> +
> +        self._img_instance.shutdown(wait=True)
> +
> +        name = '%s-%s-%s' % (
> +            self.platform.tag, self.image_ami, self.platform.uuid
> +        )
> +        response = self.platform.ec2_client.create_image(
> +            Name=name, InstanceId=self._img_instance.instance.instance_id
> +        )
> +        self.image_ami_edited = response['ImageId']
> +        image = self.platform.ec2_resource.Image(self.image_ami_edited)
> +        self.platform.wait_for_state(image, 'available')
> +
> +        return EC2Snapshot(self.platform, self.properties, self.config,
> +                           self.features, self.image_ami_edited)
> +
> +# vi: ts=4 expandtab
> diff --git a/tests/cloud_tests/platforms/ec2/platform.py b/tests/cloud_tests/platforms/ec2/platform.py
> new file mode 100644
> index 0000000..55dfd0d
> --- /dev/null
> +++ b/tests/cloud_tests/platforms/ec2/platform.py
> @@ -0,0 +1,260 @@
> +# This file is part of cloud-init. See LICENSE file for license information.
> +
> +"""Base EC2 platform."""
> +import os
> +import time
> +import uuid
> +
> +import boto3
> +import botocore
> +
> +from ..platforms import Platform
> +from .image import EC2Image
> +from .instance import EC2Instance
> +from cloudinit import util as c_util
> +from tests.cloud_tests import util
> +
> +
> +class EC2Platform(Platform):
> +    """EC2 test platform."""
> +
> +    platform_name = 'ec2'
> +    ipv4_cidr = '192.168.1.0/20'
> +
> +    def __init__(self, config):
> +        """Set up platform."""
> +        super(EC2Platform, self).__init__(config)
> +        # Used with SSH key and custom AMI generation naming with EC2
> +        self.uuid = str(uuid.uuid1())[0:8]
> +        self.tag = config['tag']
> +
> +        self.ec2_client = boto3.client('ec2')
> +        self.ec2_resource = boto3.resource('ec2')
> +        self.ec2_region = boto3.Session().region_name
> +        self.instance_type = config['instance-type']
> +        self.key_name = self._upload_public_key(config)
> +        self.vpc = self._setup_vpc()
> +        self.vpc_security_group = self._get_security_group()
> +        self.vpc_subnet = self._get_subnet()
> +
> +    def create_instance(self, properties, config, features,
> +                        image_ami, user_data):
> +        """Create an instance
> +
> +        @param src_img_path: image path to launch from
> +        @param properties: image properties
> +        @param config: image configuration
> +        @param features: image features
> +        @param image_ami: string of image ami ID
> +        @param user_data: test user-data to pass to instance
> +        @return_value: cloud_tests.instances instance
> +        """
> +        return EC2Instance(self, properties, config, features,
> +                           image_ami, user_data)
> +
> +    def destroy(self):
> +        """Delete platform."""
> +        if self.key_name:
> +            self.ec2_client.delete_key_pair(KeyName=self.key_name)
> +
> +    def get_image(self, img_conf):
> +        """Get image using specified image configuration.
> +
> +        @param img_conf: configuration for image
> +        @return_value: cloud_tests.images instance
> +        """
> +        if img_conf['root-store'] == 'ebs':
> +            root_store = 'ssd'
> +        elif img_conf['root-store'] == 'instance-store':
> +            root_store = 'instance'
> +        else:
> +            raise RuntimeError('Unknown root-store type: %s' %
> +                               (img_conf['root-store']))
> +
> +        filters = [
> +            'arch=%s' % c_util.get_architecture(),
> +            'endpoint=https://ec2.%s.amazonaws.com' % self.ec2_region,
> +            'region=%s' % self.ec2_region,
> +            'release=%s' % img_conf['release'],
> +            'root_store=%s' % root_store,
> +            'virt=hvm',
> +        ]
> +
> +        image = self._query_streams(img_conf, filters)
> +
> +        try:
> +            image_ami = image['id']
> +        except KeyError:
> +            raise RuntimeError('No images found for %s!' % img_conf['release'])
> +
> +        image = EC2Image(self, img_conf, image_ami)
> +        return image
> +
> +    @staticmethod
> +    def wait_for_state(resource, goal_state):
> +        """Wait up to 5 minutes (5 * 60) for a specific state.
> +
> +        @param: resource to check with resource.state
> +        @param: string goal state to be in

docstring typos:
 @param resource: resource upon which we wait
 @param goal_state: string goal_state we will wait for

Agreed on dropping this loop, in launch-ec2 script we do the following with instances which would apply here I think:

      print('Waiting for EC2 instance initialization')
    resource.wait_until_running(
        Filters=[{'Name': 'instance-state-name', 'Values': ['running']}])
    resource.reload() # which refreshes the latest values locally



It seems we can use wait_until_(running|stopped|available)  for the use-cases you currently have drawn up.

> +        """
> +        attempts = 0
> +
> +        while attempts < 60:
> +            try:
> +                resource.reload()
> +                current_state = resource.state
> +                # Sometimes get {'Code': 0, 'Name': 'pending'}
> +                if isinstance(current_state, dict):
> +                    current_state = current_state['Name']
> +
> +                if current_state == goal_state:
> +                    return
> +            # will occur if instance is not up yet
> +            except botocore.exception.BotoSeverError:

This exception doesn't exist (and has a typo). It should be something else or maybe BotoCoreError superclass if we don't have one specified? May not matter if you drop this function to use the waiter.

> +                pass
> +
> +            time.sleep(5)
> +            attempts += 1
> +
> +        raise util.InTargetExecuteError(
> +            b'', b'', 1, goal_state, 'wait_for_state',
> +            reason='%s: took too long to get to %s' % (resource, goal_state)
> +        )
> +
> +    def _create_internet_gateway(self, vpc):
> +        """Create Internet Gateway and assign to VPC.
> +
> +        @param vpc: VPC to create internet gateway on
> +        """
> +        internet_gateway = self.ec2_resource.create_internet_gateway()
> +        internet_gateway.attach_to_vpc(VpcId=vpc.vpc_id)
> +        self._tag_resource(internet_gateway)
> +        return internet_gateway
> +
> +    def _create_subnet(self, vpc):
> +        """Generate IPv4 and IPv6 subnets for use.
> +
> +        @param vpc: VPC to create subnets on
> +        """
> +        ipv6_block = vpc.ipv6_cidr_block_association_set[0]['Ipv6CidrBlock']
> +        ipv6_cidr = ipv6_block[:-2] + '64'
> +
> +        subnet = vpc.create_subnet(CidrBlock=self.ipv4_cidr,
> +                                   Ipv6CidrBlock=ipv6_cidr)
> +        modify_subnet = subnet.meta.client.modify_subnet_attribute
> +        modify_subnet(SubnetId=subnet.id,
> +                      MapPublicIpOnLaunch={'Value': True})
> +        self._tag_resource(subnet)
> +
> +    def _get_security_group(self):
> +        """Return default security group from VPC.
> +
> +        For now there should only be one.
> +
> +        @return_value: Return first security group object
> +        """
> +        return list(self.vpc.security_groups.all())[0]

We should do this search using a filter so we don't collide with other groups in existing accounts

from launch-ec2:
    existing_groups = [
        group for group in self.vpc.security_groups.filter(
            Filters=[{'Name': 'tag-key', 'Values': ['Name']},
                     {'Name': 'tag-value', 'Values': [self.tag]}])]

> +
> +    def _get_subnet(self):
> +        """Return first subnet from VPC.
> +
> +        For now there should only be one.
> +
> +        @return_value: Return subnet object
> +        """
> +        return list(self.vpc.subnets.all())[0]
> +
> +    def _setup_vpc(self):
> +        """Setup AWS EC2 VPC or return existing VPC."""
> +        for vpc in self.ec2_resource.vpcs.all():

Instead of the loop, we can use filters too here:
    vpcs = self.ec2_resource.vpcs.filter(Filters=[{'Name': 'tag:Name', 'Values': [self.tag]}])
    if vpcs:
        return vpcs[0]

> +            if not vpc.tags:
> +                continue
> +            for tag in vpc.tags:
> +                if tag['Value'] == self.tag:
> +                    return vpc
> +
> +        vpc = self.ec2_resource.create_vpc(
> +            CidrBlock=self.ipv4_cidr,
> +            AmazonProvidedIpv6CidrBlock=True
> +        )
> +        vpc.wait_until_available()
> +        self._tag_resource(vpc)
> +
> +        internet_gateway = self._create_internet_gateway(vpc)
> +        self._create_subnet(vpc)
> +        self._update_routing_table(vpc, internet_gateway.id)
> +        self._update_security_group(vpc)
> +
> +        return vpc
> +
> +    def _tag_resource(self, resource):

Great process for resource tracking. Was easy to find and remove any artifacts from my existing ec2 account.

> +        """Tag a resouce with the specified tag.
> +
> +        This makes finding and deleting resources specific to this testing
> +        much easier to find.
> +
> +        @param resource: resource to tag"""
> +        tag = {
> +            'Key': 'Name',
> +            'Value': self.tag
> +        }
> +        resource.create_tags(Tags=[tag])
> +
> +    def _update_routing_table(self, vpc, internet_gateway_id):
> +        """Update default routing table with internet gateway.
> +
> +        This sets up internet access between the VPC via the internet gateway
> +        by configuring routing tables for IPv4 and IPv6.
> +
> +        @param vpc: VPC containing routing table to update
> +        @param internet_gateway_id: gateway ID to use for routing
> +        """
> +        route_table = list(vpc.route_tables.all())[0]
> +        route_table.create_route(DestinationCidrBlock='0.0.0.0/0',
> +                                 GatewayId=internet_gateway_id)
> +        route_table.create_route(DestinationIpv6CidrBlock='::/0',
> +                                 GatewayId=internet_gateway_id)
> +        self._tag_resource(route_table)
> +
> +    def _update_security_group(self, vpc):
> +        """Allow only SSH inbound to default VPC security group.
> +
> +        This revokes the initial accept all permissions and only allows
> +        the SSH inbound.
> +
> +        @param vpc: VPC containing security group to update
> +        """
> +        ssh = {
> +            'IpProtocol': 'TCP',
> +            'FromPort': 22,
> +            'ToPort': 22,
> +            'IpRanges': [{'CidrIp': '0.0.0.0/0'}],
> +            'Ipv6Ranges': [{'CidrIpv6': '::/0'}]
> +        }
> +
> +        security_group = list(vpc.security_groups.all())[0]
> +        security_group.revoke_ingress(
> +            IpPermissions=security_group.ip_permissions
> +        )
> +        security_group.authorize_ingress(
> +            IpPermissions=[ssh]
> +        )
> +        self._tag_resource(security_group)
> +
> +    def _upload_public_key(self, config):

@Josh, @Ryan, @Scott, Maybe this is bike-shedding, I'd really like to see us only generate one stock named key here, and replace that named key on each test run.  

We can check for the existing named key and replace it if we run the tests again

        pairs = [k for k in self.ec2_resource.key_pairs.filter(
            KeyNames=[name])]
        if pairs:
            return pairs[0]  # We already have the key either return, or replace it depending on our use case.

> +        """Generate random name and upload SSH key with that name.
> +
> +        @param config: platform config
> +        @return: string of ssh key name
> +        """
> +        key_file = os.path.join(config['data_dir'], config['public_key'])
> +        with open(key_file, 'r') as file:
> +            public_key = file.read().strip('\n')
> +
> +        name = '%s-%s' % (self.tag, self.uuid)
> +        self.ec2_client.import_key_pair(KeyName=name,
> +                                        PublicKeyMaterial=public_key)
> +
> +        return name
> +
> +# vi: ts=4 expandtab
> diff --git a/tests/cloud_tests/platforms/instances.py b/tests/cloud_tests/platforms/instances.py
> index 8c59d62..6918003 100644
> --- a/tests/cloud_tests/platforms/instances.py
> +++ b/tests/cloud_tests/platforms/instances.py
> @@ -1,14 +1,22 @@
>  # This file is part of cloud-init. See LICENSE file for license information.
>  
>  """Base instance."""
> +import time
> +
> +import paramiko
> +from paramiko.ssh_exception import (BadHostKeyException,
> +                                    AuthenticationException,

Could pull these 3 imports to one line below I think
from paramiko.ssh_exception import (
    BadHostKeyException, AuthenticationException, SSHException)

> +                                    SSHException)
>  
>  from ..util import TargetBase
> +from tests.cloud_tests import util
>  
>  
>  class Instance(TargetBase):
>      """Base instance object."""
>  
>      platform_name = None
> +    _ssh_client = None
>  
>      def __init__(self, platform, name, properties, config, features):
>          """Set up instance.
> @@ -49,6 +61,58 @@ class Instance(TargetBase):
>          """Clean up instance."""
>          pass
>  
> +    def _ssh(self, command, stdin=None):
> +        """Run a command via SSH."""
> +        client = self._ssh_connect()
> +
> +        cmd = util.shell_pack(command)
> +        try:
> +            fp_in, fp_out, fp_err = client.exec_command(cmd)
> +            channel = fp_in.channel
> +
> +            if stdin is not None:
> +                fp_in.write(stdin)
> +                fp_in.close()
> +
> +            channel.shutdown_write()
> +            rc = channel.recv_exit_status()
> +        except SSHException as e:
> +            raise util.InTargetExecuteError(b'', b'', 1, command, 'ssh',
> +                                            reason=e)
> +
> +        return (fp_out.read(), fp_err.read(), rc)
> +
> +    def _ssh_connect(self):
> +        """Connect via SSH."""
> +        if self._ssh_client:
> +            return self._ssh_client
> +
> +        if not self.ssh_ip or not self.ssh_port:
> +            raise ValueError
> +
> +        client = paramiko.SSHClient()
> +        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
> +        private_key = paramiko.RSAKey.from_private_key_file(self.ssh_key_file)
> +
> +        retries = 30
> +        while retries:
> +            try:
> +                client.connect(username='ubuntu', hostname=self.ssh_ip,
> +                               port=self.ssh_port, pkey=private_key,
> +                               banner_timeout=30)
> +                self._ssh_client = client
> +                return client
> +            except (ConnectionRefusedError, AuthenticationException,
> +                    BadHostKeyException, ConnectionResetError, SSHException,
> +                    OSError) as e:
> +                retries -= 1

Is it possible for us to log these retries? LOG.debug('Retrying ssh connection to instance {num_retries} more times'.format(num_retries=retries))
Judging from how we raise the ssh_cmd, maybe logging is harder than it looks.

> +                time.sleep(10)
> +
> +        ssh_cmd = 'Failed command to: ubuntu@%s:%s after 300 seconds' % (
> +            self.ssh_ip, self.ssh_port
> +        )
> +        raise util.InTargetExecuteError(b'', b'', 1, ssh_cmd, 'ssh')
> +
>      def _wait_for_system(self, wait_for_cloud_init):
>          """Wait until system has fully booted and cloud-init has finished.
>  
> diff --git a/tests/cloud_tests/platforms/platforms.py b/tests/cloud_tests/platforms/platforms.py
> index 2897536..aa88d53 100644
> --- a/tests/cloud_tests/platforms/platforms.py
> +++ b/tests/cloud_tests/platforms/platforms.py
> @@ -24,4 +31,63 @@ class Platform(object):
>          """Clean up platform data."""
>          pass
>  
> +    def _generate_ssh_keys(self, data_dir):
> +        """Generate SSH keys to be used with image."""
> +        filename = os.path.join(data_dir, 'id_rsa')

If we end up naming ssh keys in a given platform something specific for the given test run, maybe it's worth us also generating that key name here too so the person who spun up the test has reference to the key that was used on run X. I don't really know how to resolve this if we are pushing up multiple keys to a platform so that we can potentially reference instances that are still left alive if we preserved them.

> +
> +        if os.path.exists(filename):
> +            c_util.del_file(filename)
> +
> +        c_util.subp(['ssh-keygen', '-t', 'rsa', '-b', '4096',
> +                     '-f', filename, '-P', '',
> +                     '-C', 'ubuntu@cloud_test'],
> +                    capture=True)
> +
> +    @staticmethod
> +    def _query_streams(img_conf, img_filter):
> +        """Query streams for latest image given a specific filter.
> +
> +        @param img_conf: configuration for image
> +        @param filters: array of filters as strings format 'key=value'
> +        @return: dictionary with latest image information or empty
> +        """
> +        def policy(content, path):
> +            return s_util.read_signed(content, keyring=img_conf['keyring'])
> +
> +        (url, path) = s_util.path_from_mirror_url(img_conf['mirror_url'], None)
> +        smirror = mirrors.UrlMirrorReader(url, policy=policy)
> +
> +        config = {'max_items': 1, 'filters': filters.get_filters(img_filter)}
> +        tmirror = FilterMirror(config)
> +        tmirror.sync(smirror, path)
> +
> +        return tmirror.json_entries[0]
> +
> +
> +class FilterMirror(mirrors.BasicMirrorWriter):
> +    """Taken from sstream-query to return query result as json array."""
> +
> +    def __init__(self, config=None):
> +        super(FilterMirror, self).__init__(config=config)
> +        if config is None:
> +            config = {}
> +        self.config = config
> +        self.filters = config.get('filters', [])
> +        self.json_entries = []
> +
> +    def load_products(self, path=None, content_id=None):
> +        return {'content_id': content_id, 'products': {}}
> +
> +    def filter_item(self, data, src, target, pedigree):
> +        return filters.filter_item(self.filters, data, src, pedigree)
> +
> +    def insert_item(self, data, src, target, pedigree, contentsource):
> +        # src and target are top level products:1.0
> +        # data is src['products'][ped[0]]['versions'][ped[1]]['items'][ped[2]]
> +        # contentsource is a ContentSource if 'path' exists in data or None
> +        data = s_util.products_exdata(src, pedigree)
> +        if 'path' in data:
> +            data.update({'item_url': contentsource.url})
> +        self.json_entries.append(data)
> +
>  # vi: ts=4 expandtab


-- 
https://code.launchpad.net/~powersj/cloud-init/+git/cloud-init/+merge/335186
Your team cloud-init commiters is requested to review the proposed merge of ~powersj/cloud-init:cii-enable-ec2 into cloud-init:master.


References