Skip to content

Commit

Permalink
feat: speedup integration tests
Browse files Browse the repository at this point in the history
This makes the integration tests start a bit faster by concurrently building all the charms + deploying the servers in a concurrent way.
Additionally, we stop depending on LXD for the servers, opting for deploying Juju applications and running commands inside them when required.
  • Loading branch information
jedel1043 authored and NucciTheBoss committed Jan 22, 2025
1 parent acc5171 commit 5622152
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 229 deletions.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ dev = [

# Integration
"juju ~= 3.3",
"pylxd ~= 2.3",
"pytest ~= 7.2",
"pytest-operator ~= 0.34",
"pytest-order ~= 1.1",
"tenacity ~= 8.2",
"requests ~= 2.31.0", # TODO: track https://github.com/psf/requests/issues/6707
]

[tool.uv.workspace]
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)

FILESYSTEM_CLIENT_DIR = (
Path(filesystem_client) if (filesystem_client := os.getenv("FILESYSTEM_CLIENT_DIR")) else None
)
Expand Down
242 changes: 86 additions & 156 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import json
import logging
import textwrap
from typing import Any
from collections.abc import Awaitable
from pathlib import Path

from pylxd import Client
import juju
from juju import machine
from pytest_operator.plugin import OpsTest
from tenacity import retry, stop_after_attempt, wait_exponential

from charms.filesystem_client.v0.filesystem_info import CephfsInfo, NfsInfo
Expand All @@ -20,68 +23,50 @@
CEPH_PATH = "/"


def _exec_command(instance: Any, cmd: [str]) -> str:
_logger.info(f"Executing `{' '.join(cmd)}`")
code, stdout, stderr = instance.execute(cmd, environment={"DEBIAN_FRONTEND": "noninteractive"})
if code != 0:
_logger.error(stderr)
raise Exception(f"Failed to execute command `{' '.join(cmd)}` in instance")
_logger.info(stdout)
async def _exec_cmd(machine: machine.Machine, cmd: str) -> str:
_logger.info("Executing `%s`", cmd)
stdout = await machine.ssh(f"sudo bash -c '{cmd.replace("'", "'\\''")}'", wait_for_active=True)
if stdout:
_logger.info("stdout: %s", stdout)

return stdout


def _exec_commands(instance: Any, cmds: [[str]]) -> None:
async def _exec_cmds(machine: machine.Machine, cmds: [str]) -> None:
for cmd in cmds:
_exec_command(instance, cmd)
await _exec_cmd(machine, cmd)


async def build_and_deploy_charm(
ops_test: OpsTest, charm: Awaitable[str | Path], *deploy_args, **deploy_kwargs
):
"""Build and deploy the charm identified by `charm`."""
charm = await charm
deploy_kwargs["channel"] = "edge" if isinstance(charm, str) else None
await ops_test.model.deploy(str(charm), *deploy_args, **deploy_kwargs)


def bootstrap_nfs_server() -> NfsInfo:
"""Bootstrap a minimal NFS kernel server in LXD.
async def bootstrap_nfs_server(ops_test: OpsTest) -> NfsInfo:
"""Bootstrap a minimal NFS kernel server in Juju.
Returns:
NfsInfo: Information to mount the NFS share.
"""
client = Client()

if client.instances.exists("nfs-server"):
_logger.info("NFS server already exists")
instance = client.instances.get("nfs-server")
address = instance.state().network["enp5s0"]["addresses"][0]["address"]
_logger.info(f"NFS share endpoint is nfs://{address}/data")
return NfsInfo(hostname=address, port=None, path="/data")

_logger.info("Bootstrapping minimal NFS kernel server")
config = {
"name": "nfs-server",
"source": {
"alias": "noble/amd64",
"mode": "pull",
"protocol": "simplestreams",
"server": "https://cloud-images.ubuntu.com/releases",
"type": "image",
},
"type": "virtual-machine",
}
client.instances.create(config, wait=True)
instance = client.instances.get(config["name"])
instance.start(wait=True)
_logger.info("Installing NFS server inside LXD container")

_wait_for_machine(instance)

_exec_commands(
instance,
[
["apt", "update", "-y"],
["apt", "upgrade", "-y"],
],
await ops_test.model.deploy(
"ubuntu",
application_name="nfs-server",
base="[email protected]",
constraints=juju.constraints.parse("virt-type=virtual-machine"),
)
await ops_test.model.wait_for_idle(
apps=["nfs-server"],
status="active",
timeout=1000,
)

# Restart in case there was a kernel update.
instance.restart(wait=True)
_wait_for_machine(instance)
machine = ops_test.model.applications["nfs-server"].units[0].machine

_exec_command(instance, ["apt", "-y", "install", "nfs-kernel-server"])
await _exec_cmd(machine, "apt -y install nfs-kernel-server")

exports = textwrap.dedent(
"""
Expand All @@ -90,143 +75,88 @@ def bootstrap_nfs_server() -> NfsInfo:
"""
).strip("\n")
_logger.info(f"Uploading the following /etc/exports file:\n{exports}")
instance.files.put("/etc/exports", exports)
await _exec_cmd(machine, f'echo -e "{exports.replace("\n", "\\n")}" > /etc/exports')
_logger.info("Starting NFS server")
_exec_commands(
instance,
await _exec_cmds(
machine,
[
["mkdir", "-p", "/data"],
["exportfs", "-a"],
["systemctl", "restart", "nfs-kernel-server"],
"mkdir -p /data",
"exportfs -a",
"systemctl restart nfs-kernel-server",
],
)
for i in [1, 2, 3]:
_exec_command(instance, ["touch", f"/data/test-{i}"])
address = instance.state().network["enp5s0"]["addresses"][0]["address"]
await _exec_cmd(machine, f"touch /data/test-{i}")
address = (await _exec_cmd(machine, "hostname")).strip()
_logger.info(f"NFS share endpoint is nfs://{address}/data")
return NfsInfo(hostname=address, port=None, path="/data")


def bootstrap_microceph() -> CephfsInfo:
"""Bootstrap a minimal Microceph cluster in LXD.
async def bootstrap_microceph(ops_test: OpsTest) -> CephfsInfo:
"""Bootstrap a minimal Microceph cluster in Juju.
Returns:
CephfsInfo: Information to mount the CephFS share.
"""
client = Client()
if client.instances.exists("microceph"):
_logger.info("Microceph server already exists")
instance = client.instances.get("microceph")
return _get_cephfs_info(instance)

_logger.info("Bootstrapping Microceph cluster")
config = {
"name": "microceph",
"source": {
"alias": "jammy/amd64",
"mode": "pull",
"protocol": "simplestreams",
"server": "https://cloud-images.ubuntu.com/releases",
"type": "image",
},
"type": "virtual-machine",
}
client.instances.create(config, wait=True)
instance = client.instances.get(config["name"])
instance.start(wait=True)
_logger.info("Installing Microceph inside LXD container")

_wait_for_machine(instance)

_exec_commands(
instance,
[
["ln", "-s", "/bin/true"],
["apt", "update", "-y"],
["apt", "upgrade", "-y"],
["apt", "install", "-y", "ceph-common"],
],

await ops_test.model.deploy(
"microceph",
application_name="microceph",
base="[email protected]",
channel="squid/beta",
constraints=juju.constraints.parse("mem=4G root-disk=20G virt-type=virtual-machine"),
num_units=1,
storage={"osd-standalone": juju.constraints.parse_storage_constraint("loop,3,1G")},
)
await ops_test.model.wait_for_idle(
apps=["microceph"],
status="active",
timeout=1000,
)

# Restart in case there was a kernel update.
instance.restart(wait=True)
_wait_for_machine(instance)
machine = ops_test.model.applications["microceph"].units[0].machine

_exec_commands(
instance,
await _wait_for_ceph(machine)

await _exec_cmds(
machine,
[
["snap", "install", "microceph"],
["microceph", "cluster", "bootstrap"],
["microceph", "disk", "add", "loop,1G,3"],
["microceph.ceph", "osd", "pool", "create", f"{CEPH_FS_NAME}_data"],
["microceph.ceph", "osd", "pool", "create", f"{CEPH_FS_NAME}_metadata"],
[
"microceph.ceph",
"fs",
"new",
CEPH_FS_NAME,
f"{CEPH_FS_NAME}_metadata",
f"{CEPH_FS_NAME}_data",
],
[
"microceph.ceph",
"fs",
"authorize",
CEPH_FS_NAME,
f"client.{CEPH_USERNAME}",
CEPH_PATH,
"rw",
],
# Need to generate the test files inside microceph itself.
[
"ln",
"-sf",
"/var/snap/microceph/current/conf/ceph.client.admin.keyring",
"/etc/ceph/ceph.client.admin.keyring",
],
[
"ln",
"-sf",
"/var/snap/microceph/current/conf/ceph.keyring",
"/etc/ceph/ceph.keyring",
],
["ln", "-sf", "/var/snap/microceph/current/conf/ceph.conf", "/etc/ceph/ceph.conf"],
"ln -s /bin/true",
"apt install -y ceph-common",
f"microceph.ceph osd pool create {CEPH_FS_NAME}_data",
f"microceph.ceph osd pool create {CEPH_FS_NAME}_metadata",
f"microceph.ceph fs new {CEPH_FS_NAME} {CEPH_FS_NAME}_metadata {CEPH_FS_NAME}_data",
f"microceph.ceph fs authorize {CEPH_FS_NAME} client.{CEPH_USERNAME} {CEPH_PATH} rw",
"ln -sf /var/snap/microceph/current/conf/ceph.client.admin.keyring /etc/ceph/ceph.client.admin.keyring",
"ln -sf /var/snap/microceph/current/conf/ceph.keyring /etc/ceph/ceph.keyring",
"ln -sf /var/snap/microceph/current/conf/ceph.conf /etc/ceph/ceph.conf",
],
)

_mount_cephfs(instance)
await _wait_for_ceph(machine)
await _exec_cmd(machine, f"mount -t ceph admin@.{CEPH_FS_NAME}={CEPH_PATH} /mnt")

for i in [1, 2, 3]:
_exec_command(instance, ["touch", f"/mnt/test-{i}"])

return _get_cephfs_info(instance)
await _exec_cmd(machine, f"touch /mnt/test-{i}")

return await _get_cephfs_info(machine)

@retry(wait=wait_exponential(max=6), stop=stop_after_attempt(10))
def _wait_for_machine(instance) -> None:
# Need to extract this into its own function to apply the tenacity decorator
_exec_command(instance, ["echo"])
_exec_command(instance, ["systemctl", "is-active", "snapd", "--quiet"])


@retry(wait=wait_exponential(max=6), stop=stop_after_attempt(20))
def _mount_cephfs(instance) -> None:
# Need to extract this into its own function to apply the tenacity decorator
@retry(wait=wait_exponential(max=10), stop=stop_after_attempt(20))
async def _wait_for_ceph(machine: machine.Machine) -> None:
# Wait until the cluster is ready to mount the filesystem.
status = json.loads(_exec_command(instance, ["microceph.ceph", "-s", "-f", "json"]))
status = json.loads(await _exec_cmd(machine, "microceph.ceph -s -f json"))
if status["health"]["status"] != "HEALTH_OK":
raise Exception("CephFS is not available")

_exec_command(instance, ["mount", "-t", "ceph", f"admin@.{CEPH_FS_NAME}={CEPH_PATH}", "/mnt"])


def _get_cephfs_info(instance: Any) -> CephfsInfo:
status = json.loads(_exec_command(instance, ["microceph.ceph", "-s", "-f", "json"]))
async def _get_cephfs_info(machine: machine.Machine) -> CephfsInfo:
status = json.loads(await _exec_cmd(machine, "microceph.ceph -s -f json"))
fsid = status["fsid"]
host = instance.state().network["enp5s0"]["addresses"][0]["address"] + ":6789"
key = _exec_command(
instance, ["microceph.ceph", "auth", "print-key", f"client.{CEPH_USERNAME}"]
)
host = (await _exec_cmd(machine, "hostname")).strip() + ":6789"
key = await _exec_cmd(machine, f"microceph.ceph auth print-key client.{CEPH_USERNAME}")

return CephfsInfo(
fsid=fsid,
name=CEPH_FS_NAME,
Expand Down
Loading

0 comments on commit 5622152

Please sign in to comment.