cloud-init-dev team mailing list archive
cloud-init-dev team
Mailing list archive
Message #02726
[Merge] ~jcastets/cloud-init:scaleway-datasource into cloud-init:master
Julien Castets has proposed merging ~jcastets/cloud-init:scaleway-datasource into cloud-init:master.
Requested reviews:
cloud-init commiters (cloud-init-dev)
For more details, see:
Implements Scaleway datasource with user and vendor data.
Your team cloud-init commiters is requested to review the proposed merge of ~jcastets/cloud-init:scaleway-datasource into cloud-init:master.
diff --git a/cloudinit/sources/ b/cloudinit/sources/
new file mode 100644
index 0000000..7ac1b1a
--- /dev/null
+++ b/cloudinit/sources/
@@ -0,0 +1,264 @@
+# vi: ts=4 expandtab
+# Author: Julien Castets <castets.j@xxxxxxxxx>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <>.
+import json
+import socket
+import time
+import requests
+# pylint fails to import the two modules below.
+# pylint: disable=E0401
+from requests.packages.urllib3.connection import HTTPConnection
+from requests.packages.urllib3.poolmanager import PoolManager
+from cloudinit import log as logging
+from cloudinit import sources
+from cloudinit import url_helper
+from cloudinit import util
+LOG = logging.getLogger(__name__)
+ 'metadata_url': '',
+ 'userdata_url': '',
+ 'vendordata_url': ''
+def on_scaleway(user_data_url, retries):
+ """
+ Check if we are on Scaleway.
+ The only backward-compatible way to check if you are on a Scaleway instance
+ is to query the metadata API. For security reasons, the special endpoint
+ /user_data/ can only be accessed from a privileged TCP source port (ie.
+ below 1024), otherwise the API returns a HTTP/403.
+ In other words, on Scaleway:
+ #> curl
+ [...] 403 error
+ #> curl --local-port 1-1024
+ [...] 200 OK
+ This function queries the endpoint /user_data/ and returns True if a
+ HTTP/403 is returned.
+ """
+ tries = max(int(retries) + 1, 1)
+ for ntry in range(tries):
+ try:
+ code = requests.head(user_data_url).status_code
+ if code == 403:
+ return True
+ # If we are rate limited or if there is a server error, we might
+ # not be talking to the Scaleway metadata API and we need to try
+ # again.
+ if code != 429 and code < 500:
+ return False
+ # Couldn't query the API.
+ except (requests.exceptions.ConnectionError,
+ requests.exceptions.Timeout):
+ return False
+ # Be nice, and wait a bit before retrying.
+ time.sleep(5)
+ return False
+class SourceAddressAdapter(requests.adapters.HTTPAdapter):
+ """
+ Adapter for requests to choose the local address to bind to.
+ """
+ def __init__(self, source_address, **kwargs):
+ self.source_address = source_address
+ super(SourceAddressAdapter, self).__init__(**kwargs)
+ def init_poolmanager(self, connections, maxsize, block=False):
+ socket_options = HTTPConnection.default_socket_options + [
+ (socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ ]
+ self.poolmanager = PoolManager(num_pools=connections,
+ maxsize=maxsize,
+ block=block,
+ source_address=self.source_address,
+ socket_options=socket_options)
+def _get_type_data(typedata_address, timeout, requests_session):
+ """
+ Retrieve user data or vendor data.
+ Scaleway userdata/vendordata API returns HTTP/404 if user/vendor data is
+ not set.
+ This function calls `url_helper.readurl` but instead of considering
+ HTTP/404 as an error that requires a retry, it considers it as empty
+ user/vendor data.
+ Also, be aware the user data/vendor API requires the source port to be
+ below 1024. If requests raises ConnectionError (EADDRINUSE), the caller
+ should retry to call this function on an other port.
+ """
+ try:
+ resp = url_helper.readurl(
+ typedata_address,
+ data=None,
+ timeout=timeout,
+ # It's the caller's responsability to recall this function in case
+ # of exception. Don't let url_helper.readurl() retry by itself.
+ retries=0,
+ session=requests_session,
+ # If the error is a HTTP/404 or a ConnectionError, go into raise
+ # block below.
+ exception_cb=lambda _, exc: exc.code == 404 or (
+ isinstance(exc.cause, requests.exceptions.ConnectionError) and
+ 'Address already in use' in exc.message
+ )
+ )
+ return util.decode_binary(resp.contents)
+ except url_helper.UrlError as exc:
+ # Empty user data.
+ if exc.code == 404:
+ return None
+ raise
+class DataSourceScaleway(sources.DataSource):
+ def __init__(self, sys_cfg, distro, paths):
+ LOG.debug('Init Scaleway')
+ sources.DataSource.__init__(self, sys_cfg, distro, paths)
+ self.metadata = {}
+ self.ds_cfg = util.mergemanydict([
+ util.get_cfg_by_path(sys_cfg, ["datasource", "Scaleway"], {}),
+ ])
+ self.metadata_address = self.ds_cfg['metadata_url']
+ self.userdata_address = self.ds_cfg['userdata_url']
+ self.vendordata_address = self.ds_cfg['vendordata_url']
+ self.retries = self.ds_cfg.get('retries', DEF_MD_RETRIES)
+ self.timeout = self.ds_cfg.get('timeout', DEF_MD_TIMEOUT)
+ def _get_privileged_type(self, type_):
+ assert type_ in ('user', 'vendor')
+ type_address = self.userdata_address \
+ if type_ == 'user' else self.vendordata_address
+ # Query user/vendor-data. Try to make a request on the first privileged
+ # port available.
+ for port in range(1, max(int(self.retries), 2)):
+ try:
+ LOG.debug(
+ 'Trying to get %s data (bind on port %d)...', type_, port
+ )
+ requests_session = requests.Session()
+ requests_session.mount(
+ 'http://',
+ SourceAddressAdapter(source_address=('', port))
+ )
+ data = _get_type_data(
+ type_address,
+ timeout=self.timeout,
+ requests_session=requests_session
+ )
+ LOG.debug('%s-data downloaded', type_)
+ return data
+ except url_helper.UrlError as exc:
+ # Local port already in use or HTTP/429.
+ time.sleep(5)
+ last_exc = exc
+ continue
+ # Max number of retries reached.
+ raise last_exc
+ def _get_metadata(self):
+ resp = url_helper.readurl(self.metadata_address,
+ timeout=self.timeout,
+ retries=self.retries)
+ metadata = json.loads(util.decode_binary(resp.contents))
+ metadata['user-data'] = self._get_privileged_type('user')
+ metadata['vendor-data'] = self._get_privileged_type('vendor')
+ return metadata
+ def get_data(self):
+ if on_scaleway(self.ds_cfg['userdata_url'], self.retries) is False:
+ return False
+ metadata = self._get_metadata()
+ self.metadata = {
+ 'id': metadata['id'],
+ 'hostname': metadata['hostname'],
+ 'user-data': metadata['user-data'],
+ 'vendor-data': metadata['vendor-data'],
+ 'ssh_public_keys': [
+ key['key'] for key in metadata['ssh_public_keys']
+ ]
+ }
+ return True
+ @property
+ def launch_index(self):
+ return None
+ def get_instance_id(self):
+ return self.metadata['id']
+ def get_public_ssh_keys(self):
+ return self.metadata['ssh_public_keys']
+ def get_hostname(self, fqdn=False, resolve_ip=False):
+ return self.metadata['hostname']
+ def get_userdata_raw(self):
+ return self.metadata['user-data']
+ @property
+ def availability_zone(self):
+ return None
+ @property
+ def region(self):
+ return None
+datasources = [
+ (DataSourceScaleway, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)),
+def get_datasource_list(depends):
+ return sources.list_from_depends(depends, datasources)
diff --git a/cloudinit/ b/cloudinit/
index d2b92e6..34458ec 100644
--- a/cloudinit/
+++ b/cloudinit/
@@ -172,7 +172,8 @@ def _get_ssl_args(url, ssl_details):
def readurl(url, data=None, timeout=None, retries=0, sec_between=1,
headers=None, headers_cb=None, ssl_details=None,
- check_status=True, allow_redirects=True, exception_cb=None):
+ check_status=True, allow_redirects=True, exception_cb=None,
+ session=None):
url = _cleanurl(url)
req_args = {
'url': url,
@@ -231,7 +232,10 @@ def readurl(url, data=None, timeout=None, retries=0, sec_between=1,
LOG.debug("[%s/%s] open '%s' with %s configuration", i,
manual_tries, url, filtered_req_args)
- r = requests.request(**req_args)
+ if session is None:
+ session = requests.Session()
+ r = session.request(**req_args)
if check_status:
LOG.debug("Read from %s (%s, %sb) after %s attempts", url,
Follow ups