sts-sponsors team mailing list archive
-
sts-sponsors team
-
Mailing list archive
-
Message #05107
[Merge] ~adam-collard/maas-ci/+git/system-tests:lxd-instance-move-methods into ~maas-committers/maas-ci/+git/system-tests:master
Adam Collard has proposed merging ~adam-collard/maas-ci/+git/system-tests:lxd-instance-move-methods into ~maas-committers/maas-ci/+git/system-tests:master.
Commit message:
[lxd] Refactor CLILXD, Instance and FilesWrapper - move methods
Remove responsiblities from CLILXD, in favour of Instance
Requested reviews:
MAAS Committers (maas-committers)
For more details, see:
https://code.launchpad.net/~adam-collard/maas-ci/+git/system-tests/+merge/437336
--
Your team MAAS Committers is requested to review the proposed merge of ~adam-collard/maas-ci/+git/system-tests:lxd-instance-move-methods into ~maas-committers/maas-ci/+git/system-tests:master.
diff --git a/systemtests/fixtures.py b/systemtests/fixtures.py
index 1052736..5e7f460 100644
--- a/systemtests/fixtures.py
+++ b/systemtests/fixtures.py
@@ -666,7 +666,7 @@ def maas_client_container(
lxd = get_lxd(log)
container = os.environ.get("MAAS_SYSTEMTESTS_CLIENT_CONTAINER", "maas-client")
- instance = lxd.get_or_create(
+ instance = lxd.create_container(
container, config["containers-image"], profile=LXD_PROFILE
)
snap_channel = maas_credentials.get("snap_channel", "latest/edge")
diff --git a/systemtests/lxd.py b/systemtests/lxd.py
index a264e10..3de83b4 100644
--- a/systemtests/lxd.py
+++ b/systemtests/lxd.py
@@ -29,7 +29,11 @@ CONTAINER_USERDATA = textwrap.dedent(
class CloudInitDisabled(Exception):
- pass
+ """Raised when cloud-init erroneously reports it's disabled."""
+
+ @classmethod
+ def matches_output(cls, process: subprocess.CompletedProcess[str]) -> bool:
+ return "Cloud-init disabled by cloud-init-generator" in process.stdout
class BadWebSocketHandshakeError(Exception):
@@ -37,21 +41,37 @@ class BadWebSocketHandshakeError(Exception):
class _FileWrapper:
- def __init__(self, lxd: CLILXD, instance_name: str, path: str):
- self._lxd = lxd
- self._instance_name = instance_name
+ def __init__(self, instance: Instance, path: str):
+ self._instance = instance
self.path = path
def __repr__(self) -> str:
- return f"<FileWrapper for {self.path} on {self._instance_name}>"
+ return f"<FileWrapper for {self.path} on {self._instance}>"
def read(self) -> str:
- return self._lxd.get_file_contents(self._instance_name, self.path)
+ filename = os.path.basename(self.path)
+ with tempfile.TemporaryDirectory() as tempdir:
+ self._instance._run(
+ [
+ "lxc",
+ "file",
+ "--quiet",
+ "pull",
+ f"{self._instance.name}{self.path}",
+ f"{tempdir}/",
+ ],
+ )
+ with open(os.path.join(tempdir, filename), "r") as f:
+ return f.read()
def write(self, content: str, uid: int = 0, gid: int = 0) -> None:
- return self._lxd.push_text_file(
- self._instance_name, content, self.path, uid=uid, gid=gid
- )
+ with tempfile.NamedTemporaryFile() as source_file:
+ source_file.write(content.encode())
+ self.push(
+ Path(source_file.name),
+ uid=uid,
+ gid=gid,
+ )
def append(self, content: str) -> None:
file_content = self.read().splitlines() if self.exists() else []
@@ -61,10 +81,15 @@ class _FileWrapper:
self.write("\n".join(file_content))
def exists(self) -> bool:
- return self._lxd.file_exists(self._instance_name, self.path)
+ try:
+ self._instance.quietly_execute(["stat", self.path])
+ except subprocess.CalledProcessError:
+ return False
+ else:
+ return True
def delete(self) -> None:
- self._lxd.execute(self._instance_name, ["rm", self.path])
+ self._instance.execute(["rm", self.path])
def push(
self,
@@ -74,28 +99,41 @@ class _FileWrapper:
mode: str = "",
create_dirs: bool = False,
) -> None:
- return self._lxd.push_file(
- self._instance_name,
+ args: list[str] = [
+ "--uid",
+ str(uid),
+ "--gid",
+ str(gid),
+ "--mode",
+ mode,
str(local_path),
- self.path,
- uid=uid,
- gid=gid,
- mode=mode,
- create_dirs=create_dirs,
- )
+ f"{self._instance.name}{self.path}",
+ ]
+ if create_dirs:
+ args.append("--create-dirs")
+
+ self._instance._run(["lxc", "file", "--quiet", "push", *args])
def pull(self, local_path: str) -> None:
- self._lxd.pull_file(self._instance_name, self.path, local_path)
+ self._instance._run(
+ [
+ "lxc",
+ "file",
+ "--quiet",
+ "pull",
+ "-r",
+ f"{self._instance.name}/{self.path}",
+ local_path,
+ ],
+ )
class _FilesWrapper:
- def __init__(self, lxd: CLILXD, instance_name: str):
- super().__init__()
- self._lxd = lxd
- self._instance_name = instance_name
+ def __init__(self, instance: Instance):
+ self._instance = instance
def __getitem__(self, path: str) -> _FileWrapper:
- return _FileWrapper(self._lxd, self._instance_name, path)
+ return _FileWrapper(self._instance, path)
class Instance:
@@ -104,6 +142,7 @@ class Instance:
def __init__(self, lxd: CLILXD, instance_name: str):
self._lxd = lxd
self._instance_name = instance_name
+ self.logger = lxd.logger.getChild(instance_name)
def __repr__(self) -> str:
return f"<Instance {self.name}>"
@@ -112,67 +151,175 @@ class Instance:
def name(self) -> str:
return self._instance_name
- @property
- def logger(self) -> logging.Logger:
- return self._lxd.logger
-
- @logger.setter
- def logger(self, logger: logging.Logger) -> None:
- self._lxd.logger = logger
+ def _run(
+ self,
+ cmd: list[str],
+ prefix: Optional[list[str]] = None,
+ logger: Optional[logging.Logger] = None,
+ ) -> subprocess.CompletedProcess[str]:
+ __tracebackhide__ = True
+ if logger is None:
+ logger = self.logger
+ return run_with_logging(cmd, logger, prefix=prefix)
def exists(self) -> bool:
- return self._lxd.container_exists(self._instance_name)
+ try:
+ self._run(["lxc", "info", self._instance_name])
+ except subprocess.CalledProcessError:
+ return False
+ else:
+ return True
def create_container(
self, image: str, user_data: Optional[str] = None, profile: Optional[str] = None
) -> Instance:
return self._lxd.create_container(self.name, image, user_data, profile)
+ def _get_lxc_command(self, environment: Optional[dict[str, str]]) -> list[str]:
+ lxc_command = ["lxc", "exec", "--force-noninteractive", self._instance_name]
+ if environment is not None:
+ for key, value in environment.items():
+ lxc_command.extend(["--env", f"{key}={value}"])
+ lxc_command.append("--")
+ return lxc_command
+
+ def _run_with_logger(
+ self,
+ executor: Callable[[], subprocess.CompletedProcess[str]],
+ logger: Optional[logging.Logger],
+ ) -> subprocess.CompletedProcess[str]:
+ __tracebackhide__ = True
+
+ # Retry the run, excepting websocket: bad handshake errors
+ @retry(exceptions=BadWebSocketHandshakeError, tries=3, logger=logger)
+ def _retry_bad_handshake() -> subprocess.CompletedProcess[str]:
+ __tracebackhide__ = True
+ try:
+ result = executor()
+ except subprocess.CalledProcessError as e:
+ if e.stderr.strip().endswith("websocket: bad handshake"):
+ raise BadWebSocketHandshakeError()
+ else:
+ raise
+ else:
+ return result
+
+ return _retry_bad_handshake()
+
def execute(
self, command: list[str], environment: Optional[dict[str, str]] = None
) -> subprocess.CompletedProcess[str]:
- return self._lxd.execute(self._instance_name, command, environment=environment)
+ __tracebackhide__ = True
+ lxc_command = self._get_lxc_command(environment)
+
+ # Suppress logging of the lxc wrapper for clearer logs
+ executor = partial(self._run, command, prefix=lxc_command)
+ return self._run_with_logger(executor, self.logger)
def quietly_execute(
self, command: list[str], environment: Optional[dict[str, str]] = None
) -> subprocess.CompletedProcess[str]:
- return self._lxd.quietly_execute(
- self._instance_name, command, environment=environment
+ """Execute a command without logging it."""
+ __tracebackhide__ = True
+ lxc_command = self._get_lxc_command(environment)
+
+ executor = partial(
+ subprocess.run,
+ lxc_command + command,
+ capture_output=True,
+ check=True,
+ encoding="utf-8",
+ errors="backslashreplace",
)
+ return self._run_with_logger(executor, None)
@property
def files(self) -> _FilesWrapper:
- return _FilesWrapper(self._lxd, self._instance_name)
+ return _FilesWrapper(self)
def get_ip_address(self) -> str:
- return self._lxd.get_ip_address(self._instance_name)
+ # quieten mypy due to https://github.com/python/mypy/issues/9689
+ @retry(
+ exceptions=ValueError, tries=30, delay=2, backoff=1.1, logger=self.logger
+ ) # type:ignore
+ def _get_ip_address() -> str:
+ result = self._run(
+ ["lxc", "list", "--format", "csv", "-c4", f"name={self._instance_name}"]
+ )
+ ip, *rest = result.stdout.split()
+ return ip
+
+ return _get_ip_address()
def list_devices(self) -> list[str]:
- return self._lxd.list_instance_devices(self._instance_name)
+ result = self._run(["lxc", "config", "device", "list", self._instance_name])
+ return result.stdout.splitlines()
def add_device(self, name: str, device_config: DeviceConfig) -> None:
- return self._lxd.add_instance_device(self._instance_name, name, device_config)
+ self._run(
+ [
+ "lxc",
+ "config",
+ "device",
+ "add",
+ self._instance_name,
+ name,
+ device_config["type"],
+ ]
+ + fmt_lxd_options(device_config),
+ )
def remove_device(self, name: str) -> None:
- return self._lxd.remove_instance_device(self._instance_name, name)
+ """Remove a device from this instance."""
+
+ # quieten mypy due to https://github.com/python/mypy/issues/9689
+ @retry(
+ exceptions=subprocess.CalledProcessError,
+ tries=5,
+ delay=0.5,
+ logger=self.logger,
+ ) # type:ignore
+ def _remove_device() -> subprocess.CompletedProcess[str]:
+ return self._run(
+ ["lxc", "config", "device", "remove", self._instance_name, name],
+ )
+
+ _remove_device()
def start(self) -> subprocess.CompletedProcess[str]:
- return self._lxd.start(self._instance_name)
+ return self._run(["lxc", "start", self._instance_name])
def stop(self, force: bool = False) -> subprocess.CompletedProcess[str]:
- return self._lxd.stop(self._instance_name, force=force)
+ argv = ["lxc", "stop", self._instance_name]
+ if force:
+ argv.append("--force")
+ return self._run(argv)
- def restart(self) -> subprocess.CompletedProcess[str]:
- return self._lxd.restart(self._instance_name)
+ def restart(self, force: bool = False) -> subprocess.CompletedProcess[str]:
+ argv = ["lxc", "restart", self._instance_name]
+ if force:
+ argv.append("--force")
+ return self._run(argv)
def delete(self) -> None:
- return self._lxd.delete(self._instance_name)
-
- def is_running(self) -> bool:
- return self._lxd.is_running(self._instance_name)
+ self._run(["lxc", "delete", "--force", self._instance_name])
def status(self) -> str:
- return self._lxd.instance_status(self._instance_name)
+ result = self._run(["lxc", "info", self._instance_name])
+ for line in result.stdout.splitlines():
+ key, value = line.split(": ", 1)
+ if key == "Status":
+ return value
+ else:
+ raise ValueError(f"Unable to find Status of {self._instance_name}")
+
+ def is_running(self) -> bool:
+ try:
+ status = self.status()
+ except ValueError:
+ return False
+ else:
+ return status == "RUNNING"
class CLILXD:
@@ -181,15 +328,6 @@ class CLILXD:
def __init__(self, logger: logging.Logger):
self.logger = logger
- def container_exists(self, instance_name: str) -> bool:
- logger = self.logger.getChild(instance_name)
- try:
- self._run(["lxc", "info", instance_name], logger=logger)
- except subprocess.CalledProcessError:
- return False
- else:
- return True
-
def _run(
self,
cmd: list[str],
@@ -208,8 +346,9 @@ class CLILXD:
user_data: Optional[str] = None,
profile: Optional[str] = None,
) -> Instance:
- logger = self.logger.getChild(name)
- if not self.container_exists(name):
+ instance = Instance(self, name)
+ logger = instance.logger
+ if not instance.exists():
logger.info(f"Creating container {name} (from {image})...")
cmd = [
"lxc",
@@ -224,19 +363,23 @@ class CLILXD:
cmd.extend(["-p", profile])
cmd.append(name)
self._run(cmd, logger=logger)
- logger.info(f"Container {name} created.")
+ logger.info(f"Container {name} created.")
+ else:
+ logger.info(f"Container {name} already exists.")
logger.info("Waiting for boot to finish...")
- instance = Instance(self, name)
-
- @retry(exceptions=CloudInitDisabled, tries=120, delay=1, logger=logger)
+ # quieten mypy due to https://github.com/python/mypy/issues/9689
+ @retry(
+ exceptions=CloudInitDisabled, tries=120, delay=1, logger=logger
+ ) # type:ignore
def _cloud_init_wait() -> None:
- process = instance.execute(
+ """Wait for cloud-init to finish, and for snapd to seed."""
+ cloud_init_process = instance.execute(
["timeout", "2000", "cloud-init", "status", "--wait", "--long"]
)
- if "Cloud-init disabled by cloud-init-generator" in process.stdout:
+ if CloudInitDisabled.matches_output(cloud_init_process):
raise CloudInitDisabled("Cloud-init is disabled.")
- process = instance.execute(
+ instance.execute(
["timeout", "2000", "snap", "wait", "system", "seed.loaded"]
)
@@ -244,199 +387,6 @@ class CLILXD:
logger.info("Boot finished.")
return instance
- def get_or_create(
- self,
- name: str,
- image: str,
- user_data: Optional[str] = None,
- profile: Optional[str] = None,
- ) -> Instance:
- instance = Instance(self, name)
- if not instance.exists():
- self.create_container(name, image, user_data=user_data, profile=profile)
- return instance
-
- def push_file(
- self,
- instance_name: str,
- source_file: str,
- target_file: str,
- uid: int = 0,
- gid: int = 0,
- mode: str = "",
- create_dirs: bool = False,
- ) -> None:
- args = [
- "--uid",
- str(uid),
- "--gid",
- str(gid),
- "--mode",
- mode,
- source_file,
- f"{instance_name}{target_file}",
- ]
- if create_dirs:
- args.append("--create-dirs")
-
- logger = self.logger.getChild(instance_name)
- self._run(["lxc", "file", "--quiet", "push", *args], logger=logger)
-
- def push_text_file(
- self,
- instance_name: str,
- content: str,
- target_file: str,
- uid: int = 0,
- gid: int = 0,
- ) -> None:
- with tempfile.NamedTemporaryFile() as source_file:
- source_file.write(content.encode())
- source_file.seek(0)
- self.push_file(
- instance_name,
- source_file.name,
- target_file,
- uid=uid,
- gid=gid,
- )
-
- def file_exists(self, instance_name: str, file_path: str) -> bool:
- try:
- self.quietly_execute(instance_name, ["stat", file_path])
- except subprocess.CalledProcessError:
- return False
- else:
- return True
-
- def pull_file(
- self, instance_name: str, file_path: str, local_path: str
- ) -> subprocess.CompletedProcess[str]:
- logger = self.logger.getChild(instance_name)
- return self._run(
- [
- "lxc",
- "file",
- "--quiet",
- "pull",
- "-r",
- f"{instance_name}/{file_path}",
- local_path,
- ],
- logger=logger,
- )
-
- def get_file_contents(self, instance_name: str, file_path: str) -> str:
- logger = self.logger.getChild(instance_name)
- filename = os.path.basename(file_path)
- with tempfile.TemporaryDirectory() as tempdir:
- self._run(
- [
- "lxc",
- "file",
- "--quiet",
- "pull",
- f"{instance_name}{file_path}",
- f"{tempdir}/",
- ],
- logger=logger,
- )
- with open(os.path.join(tempdir, filename), "r") as f:
- return f.read()
-
- def _run_with_logger(
- self,
- executor: Callable[[], subprocess.CompletedProcess[str]],
- logger: Optional[logging.Logger],
- ) -> subprocess.CompletedProcess[str]:
- __tracebackhide__ = True
-
- # Retry the run, excepting websocket: bad handshake errors
- @retry(exceptions=BadWebSocketHandshakeError, tries=3, logger=logger)
- def _retry_bad_handshake() -> subprocess.CompletedProcess[str]:
- __tracebackhide__ = True
- try:
- result = executor()
- except subprocess.CalledProcessError as e:
- if e.stderr.strip().endswith("websocket: bad handshake"):
- raise BadWebSocketHandshakeError()
- else:
- raise
- else:
- return result
-
- return _retry_bad_handshake()
-
- def _get_lxc_command(
- self, instance_name: str, environment: Optional[dict[str, str]]
- ) -> list[str]:
- lxc_command = ["lxc", "exec", "--force-noninteractive", instance_name]
- if environment is not None:
- for key, value in environment.items():
- lxc_command.extend(["--env", f"{key}={value}"])
- lxc_command.append("--")
- return lxc_command
-
- def execute(
- self,
- instance_name: str,
- command: list[str],
- environment: Optional[dict[str, str]] = None,
- ) -> subprocess.CompletedProcess[str]:
- __tracebackhide__ = True
- logger = self.logger.getChild(instance_name)
- lxc_command = self._get_lxc_command(instance_name, environment)
-
- # Suppress logging of the lxc wrapper for clearer logs
- executor = partial(self._run, command, prefix=lxc_command, logger=logger)
- return self._run_with_logger(executor, logger)
-
- def quietly_execute(
- self,
- instance_name: str,
- command: list[str],
- environment: Optional[dict[str, str]] = None,
- ) -> subprocess.CompletedProcess[str]:
- """Execute a command without logging it."""
- __tracebackhide__ = True
- lxc_command = self._get_lxc_command(instance_name, environment)
-
- executor = partial(
- subprocess.run,
- lxc_command + command,
- capture_output=True,
- check=True,
- encoding="utf-8",
- errors="backslashreplace",
- )
- return self._run_with_logger(executor, None)
-
- def delete(self, instance_name: str) -> None:
- logger = self.logger.getChild(instance_name)
- self._run(["lxc", "delete", "--force", instance_name], logger=logger)
-
- def get_ip_address(self, instance_name: str) -> str:
- logger = self.logger.getChild(instance_name)
-
- @retry(exceptions=RuntimeError, tries=30, delay=2, backoff=1.1, logger=logger)
- def _get_ip_address() -> str:
- result = self._run(
- ["lxc", "list", "--format", "json", instance_name], logger=logger
- )
- # lxc list does partial match, so we still need to find the entry
- for entry in json.loads(result.stdout):
- if entry["name"] != instance_name:
- continue
- for address in entry["state"]["network"]["eth0"]["addresses"]:
- self.logger.info(f"Considering address: {address}")
- if address["family"] == "inet":
- ip: str = address["address"]
- return ip
- else:
- raise RuntimeError("Couldn't find an IP address")
-
- return _get_ip_address()
-
def profile_exists(self, profile_name: str) -> bool:
try:
self._run(
@@ -467,49 +417,6 @@ class CLILXD:
["lxc", "profile", "device", "remove", profile_name, device_name],
)
- def list_instance_devices(self, instance_name: str) -> list[str]:
- logger = self.logger.getChild(instance_name)
- result = self._run(
- ["lxc", "config", "device", "list", instance_name], logger=logger
- )
- return result.stdout.splitlines()
-
- def add_instance_device(
- self, instance_name: str, name: str, device_config: DeviceConfig
- ) -> None:
- logger = self.logger.getChild(instance_name)
- self._run(
- [
- "lxc",
- "config",
- "device",
- "add",
- instance_name,
- name,
- device_config["type"],
- ]
- + fmt_lxd_options(device_config),
- logger=logger,
- )
-
- def remove_instance_device(self, instance_name: str, device_name: str) -> None:
- """Remove a device from an instance."""
- logger = self.logger.getChild(instance_name)
-
- @retry(
- exceptions=subprocess.CalledProcessError,
- tries=5,
- delay=0.5,
- logger=self.logger,
- )
- def _remove_device() -> subprocess.CompletedProcess[str]:
- return self._run(
- ["lxc", "config", "device", "remove", instance_name, device_name],
- logger=logger,
- )
-
- _remove_device()
-
def list_instances(self) -> dict[str, dict[str, Any]]:
result = self._run(["lxc", "list", "-f", "json"])
lxc_list: list[dict[str, Any]] = json.loads(result.stdout)
@@ -540,46 +447,6 @@ class CLILXD:
)
return Instance(self, instance_name)
- def start(self, instance_name: str) -> subprocess.CompletedProcess[str]:
- logger = self.logger.getChild(instance_name)
- return self._run(["lxc", "start", instance_name], logger=logger)
-
- def stop(
- self, instance_name: str, force: bool = False
- ) -> subprocess.CompletedProcess[str]:
- logger = self.logger.getChild(instance_name)
- argv = ["lxc", "stop", instance_name]
- if force:
- argv.append("--force")
- return self._run(argv, logger=logger)
-
- def instance_status(self, instance_name: str) -> str:
- logger = self.logger.getChild(instance_name)
- result = self._run(["lxc", "info", instance_name], logger=logger)
- for line in result.stdout.splitlines():
- key, value = line.split(": ", 1)
- if key == "Status":
- return value
- else:
- raise ValueError(f"Unable to find Status of {instance_name}")
-
- def is_running(self, instance_name: str) -> bool:
- try:
- status = self.instance_status(instance_name)
- except ValueError:
- return False
- else:
- return status == "RUNNING"
-
- def restart(
- self, instance_name: str, force: bool = False
- ) -> subprocess.CompletedProcess[str]:
- logger = self.logger.getChild(instance_name)
- argv = ["lxc", "restart", instance_name]
- if force:
- argv.append("--force")
- return self._run(argv, logger=logger)
-
def fmt_lxd_options(cfg: DeviceConfig) -> list[str]:
exclude_options = ["device_name", "type"]
Follow ups