diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml index 32d88b8d46..91e83e557f 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml @@ -45,6 +45,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | snapcraftctl prime rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml index e246a6c2e8..b94f703034 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml @@ -45,6 +45,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | craftctl default rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml index 58a3915d0e..2338bf1de4 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml @@ -82,6 +82,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | snapcraftctl prime rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml index c2e600e60e..894335b0ff 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml @@ -82,6 +82,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | craftctl default rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py new file mode 100755 index 0000000000..e564c4c318 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python3 + +import argparse +import glob +import logging +import os +import subprocess +import tempfile +import time + +from importlib import import_module +from importlib.machinery import SourceFileLoader +from multiprocessing import Process +from pathlib import Path +from rpyc_client import rpyc_client +from rpyc_test_methods import configure_local_network +from typing import Union + +OTG_MODULE = "libcomposite" +CHECKBOX_RUNTIME = os.environ.get("CHECKBOX_RUNTIME", "") +CHECKBOX_BASE_PROVIDER = os.path.join( + CHECKBOX_RUNTIME, "providers/checkbox-provider-base" +) + + +logging.basicConfig(level=logging.DEBUG) + + +def initial_configfs() -> Union[tempfile.TemporaryDirectory, Path]: + """ + return a Path object with current mount point + when the kernel configfs has been mounted + Or return a TemporaryDirectory object and mount it as a kernel configfs + + Returns: + Union[tempfile.TemporaryDirectory, Path]: kernel configfs directory + """ + logging.info("initialize configfs") + ret = subprocess.check_output("mount -t configfs", shell=True, text=True) + if ret.strip(): + configfs_dir = Path(ret.split()[2]) + logging.info("kernel configfs has been mounted on %s", configfs_dir) + else: + configfs_dir = Path(tempfile.NamedTemporaryDirectory().name) + subprocess.run("mount -t configfs none {}".format(configfs_dir.name)) + logging.info("mount configfs on %s", configfs_dir.name) + return configfs_dir + + +class OtgConfigFsOperatorBase: + """ + This is a object to setup the USB gadget to support different OTG scenario + Reference https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt + """ + + OTG_FUNCTION = "" + OTG_TARGET_MODULE = "" + + def __init__(self, root_path: Path, udc_path: str, usb_type: str): + self._child_modules = self._get_child_modules() + self.root_path = root_path + self.usb_gadget_node = None + self.udc_node = Path("/sys/class/udc").joinpath(udc_path) + self.usb_type = usb_type + + def __enter__(self): + logging.info( + "Setup the OTG configurations on %s UDC node", self.udc_node + ) + if self._child_modules: + logging.info(self._child_modules) + # To clean up the OTG modules + self.disable_otg_related_modules(self._child_modules) + self.enable_otg_module([OTG_MODULE]) + self.usb_gadget_node = Path( + tempfile.TemporaryDirectory( + dir=self.root_path.joinpath("usb_gadget"), prefix="udc_" + ).name + ) + self.otg_setup() + self.create_otg_configs() + self.create_otg_function() + return self + + def __exit__(self, exec, value, tb): + logging.debug("Clean up OTG configurations") + self._cleanup_usb_gadget() + cur_modules = [ + mod + for mod in self._get_child_modules() + if mod not in self._child_modules + ] + self.disable_otg_related_modules(cur_modules) + if self._child_modules: + self.enable_otg_module(self._child_modules) + self.otg_teardown() + + def _get_child_modules(self): + output = subprocess.check_output( + "lsmod | awk '/^libcomposite/ {print $4}'", + shell=True, + text=True, + universal_newlines=True, + ) + return output.strip("\n").split(",") if output.strip("\n") else [] + + def enable_otg_module(self, modules): + for module in modules: + subprocess.run( + "modprobe {}".format(module), shell=True, check=True + ) + + def disable_otg_related_modules(self, modules): + for module in modules: + subprocess.run( + "modprobe -r {}".format(module), shell=True, check=True + ) + + def otg_setup(self): + """ + This is function for doing any extra step for specific OTG function + such as collecting ethernet interface, serial interface + and create an USB image file + """ + pass + + def otg_teardown(self): + """ + This is function for doing any extra step for specific OTG function + such as delete an USB image file + """ + pass + + def create_otg_configs(self): + logging.info("create USB gadget") + path_lang = self.usb_gadget_node.joinpath("strings", "0x409") + path_lang.mkdir() + + vid_file = self.usb_gadget_node.joinpath("idVendor") + vid_file.write_text("0xabcd") + pid_file = self.usb_gadget_node.joinpath("idProduct") + pid_file.write_text("0x9999") + + # create configs + udc_configs = self.usb_gadget_node.joinpath("configs", "c.1") + udc_configs.mkdir() + max_power_file = udc_configs.joinpath("MaxPower") + max_power_file.write_text("120") + + def create_otg_function(self): + logging.info("create function") + self.enable_otg_module([self.OTG_TARGET_MODULE]) + func_name = "{}.0".format(self.OTG_FUNCTION) + function_path = self.usb_gadget_node.joinpath("functions", func_name) + + if not function_path.exists(): + function_path.mkdir() + + self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to( + function_path, True + ) + + def _cleanup_usb_gadget(self): + func_name = "{}.0".format(self.OTG_FUNCTION) + self.usb_gadget_node.joinpath("strings", "0x409").rmdir() + self.usb_gadget_node.joinpath("configs", "c.1", func_name).unlink(True) + self.usb_gadget_node.joinpath("configs", "c.1").rmdir() + self.usb_gadget_node.joinpath("functions", func_name).rmdir() + self.usb_gadget_node.rmdir() + + def enable_otg(self): + if self.udc_node.exists(): + self.usb_gadget_node.joinpath("UDC").write_text(self.udc_node.name) + else: + logging.error("UDC node '%s' not exists", self.udc_node) + raise ValueError(self.udc_node) + + def disable_otg(self): + self.usb_gadget_node.joinpath("UDC").write_text("") + + def self_check(self): + """ensure the USB device been generated. + + Returns: + bool: return True when a USB device been detected + """ + logging.debug("check") + return True + + def detection_check_on_rpyc(self, rpyc_ip): + """ + This is a function to detect OTG device on client + """ + pass + + def function_check_with_rpyc(self, rpyc_ip): + """ + this is a function to perform OTG testing on client + """ + pass + + +class OtgMassStorageSetup(OtgConfigFsOperatorBase): + + OTG_FUNCTION = "mass_storage" + OTG_TARGET_MODULE = "usb_f_mass_storage" + + def otg_setup(self): + """ + This is function for doing any extra step for specific OTG function + such as collecting ethernet interface, serial interface + and create an USB storage + """ + logging.info("Create an USB image file for Mass Storage Test") + self._usb_img = tempfile.NamedTemporaryFile("+bw", delete=False) + subprocess.run( + "dd if=/dev/zero of={} bs=1M count=1024".format( + self._usb_img.name + ), + shell=True, + check=True, + ) + subprocess.run( + "mkdosfs -F 32 {}".format(self._usb_img.name), + shell=True, + check=True, + ) + logging.info("%s file been created", self._usb_img.name) + + def otg_teardown(self): + logging.info("Delete USB image file from %s", self._usb_img.name) + os.remove(self._usb_img.name) + + def create_otg_function(self): + logging.info("create function") + self.enable_otg_module([self.OTG_TARGET_MODULE]) + func_name = "{}.0".format(self.OTG_FUNCTION) + function_path = self.usb_gadget_node.joinpath("functions", func_name) + + if not function_path.exists(): + function_path.mkdir() + + function_path.joinpath("lun.0", "file").write_text(self._usb_img.name) + + self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to( + function_path, True + ) + + def detection_check_on_rpyc(self, rpyc_ip): + logging.info("USB drive detection on RPYC") + mounted_drive = rpyc_client(rpyc_ip, "usb_drive_check", self.usb_type) + if mounted_drive: + logging.info( + "Found USB device and mounted as '%s' on rpyc server", + mounted_drive, + ) + else: + raise RuntimeError("No USB device found on rpyc server") + self._target_dev = mounted_drive + + def function_check_with_rpyc(self, rpyc_ip): + logging.info("USB read/write testing on RPYC") + raise SystemExit( + rpyc_client( + rpyc_ip, + "usb_storage_test", + self.usb_type, + ) + ) + + def otg_test_process(self, rpyc_ip): + logging.info("Start Mass Storage Testing with OTG interface") + t_thread = Process( + target=self.function_check_with_rpyc, args=(rpyc_ip,) + ) + t_thread.start() + logging.debug("Launch USB detection and storage tests on RPYC server") + # Sleep few seconds to activate USB detection on RPYC server + time.sleep(3) + self.enable_otg() + t_thread.join() + self.disable_otg() + + if t_thread.exitcode == 0: + logging.info("OTG Mass Storage test passed") + else: + logging.debug("Exit code: %s", t_thread.exitcode) + raise RuntimeError("OTG Mass Storage test failed") + + +class OtgEthernetSetup(OtgConfigFsOperatorBase): + + OTG_FUNCTION = "ecm" + OTG_TARGET_MODULE = "usb_f_ecm" + + def _collect_net_intfs(self): + return [ + os.path.basename(intf) for intf in glob.glob("/sys/class/net/*") + ] + + def otg_setup(self): + self._net_intfs = self._collect_net_intfs() + + def self_check(self): + """ + Ensure the ethernet device been generated by usb gadget + + Returns: + bool: Return True when an USB Ethernet interface been detected + """ + logging.info("Validate a new network interface been generated") + cur_net_intfs = self._collect_net_intfs() + if len(cur_net_intfs) == len(self._net_intfs): + raise RuntimeError("OTG network interface not available") + + otg_net_intf = [x for x in cur_net_intfs if x not in self._net_intfs] + if len(otg_net_intf) != 1: + logging.error( + "Found more than one new interface. %s", otg_net_intf + ) + else: + logging.info("Found new network interface '%s'", otg_net_intf[0]) + self._net_dev = otg_net_intf[0] + + def detection_check_on_rpyc(self, rpyc_ip): + logging.info("Network interface detection on RPYC") + ret = rpyc_client(rpyc_ip, "ethernet_check") + if ret: + logging.info("Found %s network interface on rpyc server", ret) + else: + raise RuntimeError("No network interface found on rpyc server") + self._target_net_dev = list(ret)[0] + + def function_check_with_rpyc(self, rpyc_ip): + configure_local_network(self._net_dev, "169.254.0.1/24") + logging.info("Configure the %s network on RPYC", self._target_net_dev) + rpyc_client( + rpyc_ip, + "configure_local_network", + self._target_net_dev, + "169.254.0.10/24", + ) + logging.info("Ping from DUT to Target") + _module = SourceFileLoader( + "_", + os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py"), + ).load_module() + test_func = getattr(_module, "perform_ping_test") + ret = test_func([self._net_dev], "169.254.0.10") + if ret != 0: + raise RuntimeError("Failed to ping DUT from RPYC server") + + def otg_test_process(self, rpyc_ip): + self.enable_otg() + self.self_check() + self.detection_check_on_rpyc(rpyc_ip) + self.function_check_with_rpyc(rpyc_ip) + self.disable_otg() + + +class OtgSerialSetup(OtgConfigFsOperatorBase): + + OTG_FUNCTION = "acm" + OTG_TARGET_MODULE = "usb_f_acm" + + def _collect_serial_intfs(self): + return [os.path.basename(intf) for intf in glob.glob("/dev/ttyGS*")] + + def otg_setup(self): + self._ser_intfs = self._collect_serial_intfs() + + def self_check(self): + """ + Ensure a Serial device been generated by usb gadget + + Returns: + bool: Return True when a Serial interface been detected + """ + logging.info("Validate a new serial interface been generated") + cur_ser_intfs = self._collect_serial_intfs() + if len(cur_ser_intfs) == len(self._ser_intfs): + raise RuntimeError("OTG network interface not available") + + otg_ser_intf = [x for x in cur_ser_intfs if x not in self._ser_intfs] + if len(otg_ser_intf) != 1: + logging.error( + "Found more than one new interface. %s", otg_ser_intf + ) + else: + logging.info("Found new network interface '%s'", otg_ser_intf[0]) + self._serial_iface = otg_ser_intf[0] + + def detection_check_on_rpyc(self, rpyc_ip): + ret = rpyc_client(rpyc_ip, "serial_check") + if ret: + logging.info("Found %s serial interface on rpyc server", ret) + else: + logging.debug("No serial interface found on rpyc server") + self._target_serial_dev = list(ret)[0] + + def function_check_with_rpyc(self, rpyc_ip): + logging.info("perform serial client test on DUT") + func = getattr(import_module("serial_test"), "client_mode") + func( + "/dev/{}".format(self._serial_iface), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024, + ) + + def otg_test_process(self, rpyc_ip): + self.enable_otg() + self.self_check() + self.detection_check_on_rpyc(rpyc_ip) + + try: + logging.info("start serial server on rpyc server") + t_thread = Process( + target=rpyc_client, + args=( + rpyc_ip, + "enable_serial_server", + "/dev/serial/by-id/{}".format(self._target_serial_dev), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024, + ), + ) + t_thread.start() + time.sleep(3) + self.function_check_with_rpyc(rpyc_ip) + except SystemExit as err: + logging.debug(err) + finally: + t_thread.kill() + self.disable_otg() + + +OTG_TESTING_MAPPING = { + "mass_storage": OtgMassStorageSetup, + "ethernet": OtgEthernetSetup, + "serial": OtgSerialSetup, +} + + +def otg_testing(udc_node, test_func, rpyc_ip, usb_type): + configfs_dir = initial_configfs() + with OTG_TESTING_MAPPING[test_func]( + configfs_dir, udc_node, usb_type + ) as otg_cfg: + otg_cfg.otg_test_process(rpyc_ip) + + +def dump_otg_info(configs): + for config in configs.split(): + otg_conf = config.split(":") + if len(otg_conf) == 3: + print("USB_CONNECTOR: {}".format(otg_conf[0])) + print("UDC_NODE: {}".format(otg_conf[1])) + print("USB_TYPE: {}".format(otg_conf[2])) + print() + + +def register_arguments(): + parser = argparse.ArgumentParser(description="OTG test method") + + sub_parser = parser.add_subparsers( + dest="mode", + required=True, + ) + test_parser = sub_parser.add_parser("test") + test_parser.add_argument( + "-t", + "--type", + required=True, + choices=["mass_storage", "ethernet", "serial"], + ) + test_parser.add_argument("-u", "--udc-node", required=True, type=str) + test_parser.add_argument( + "--usb-type", default="usb2", type=str, choices=["usb2", "usb3"] + ) + test_parser.add_argument("--rpyc-address", required=True, type=str) + + info_parser = sub_parser.add_parser("info") + info_parser.add_argument("-c", "--config", required=True, type=str) + + return parser.parse_args() + + +def main(): + args = register_arguments() + if args.mode == "test": + otg_testing(args.udc_node, args.type, args.rpyc_address, args.usb_type) + elif args.mode == "info": + dump_otg_info(args.config) + + +if __name__ == "__main__": + main() diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py new file mode 100755 index 0000000000..c678c43221 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py @@ -0,0 +1,50 @@ +import time +import rpyc + + +def rpyc_client(host, cmd, *args, **kwargs): + """ + Run command on RPYC. + + :param host: RPYC server IP address + :param cmd: command to be executed + :param args: command arguments + :param kwargs: command keyword arguments + :returns: whatever is returned by RPYC service + :raises SystemExit: if the connection cannot be established + or the command is unknown + or a service error occurs + """ + for _ in range(2): + try: + conn = rpyc.connect( + host, + 60000, + config={"allow_all_attrs": True, "allow_exposed_attrs": False}, + ) + break + except ConnectionRefusedError: + time.sleep(1) + else: + raise SystemExit("Cannot connect to RPYC Host.") + + try: + func = getattr(conn.root, cmd) + wrap = rpyc.async_(func) + res = wrap(*args, **kwargs) + while res.ready: + print("Waiting for RPYC server complete {}".format(func)) + time.sleep(1) + break + if getattr(res._conn.root, "logs"): + print(res._conn.root.logs) + return res.value + # return getattr(conn.root, cmd)(*args, **kwargs) + except AttributeError: + raise SystemExit( + "RPYC host does not provide a '{}' command.".format(cmd) + ) + except rpyc.core.vinegar.GenericException as exc: + raise SystemExit( + "Zapper host failed to process the requested command." + ) from exc diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py new file mode 100755 index 0000000000..3619162974 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -0,0 +1,120 @@ +import io +import logging +import os +import rpyc +import sys + +from contextlib import redirect_stdout, redirect_stderr +from importlib.machinery import SourceFileLoader +from rpyc.utils.server import ThreadedServer + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +handler_std = logging.StreamHandler(sys.stdout) + +CHECKBOX_PROVIDER_CEOEM_PATH = ( + "/snap/checkbox-ce-oem/current/providers/checkbox-provider-ce-oem/bin" +) +LIBS = { + "enable_serial_server": { + "source": os.path.join(CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py"), + "function": "server_mode", + }, + "serial_check": { + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), + "function": "serial_check", + }, + "ethernet_check": { + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), + "function": "ethernet_check", + }, + "configure_local_network": { + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), + "function": "configure_local_network", + }, + "usb_storage_test": { + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), + "function": "usb_storage_test", + }, +} +dynamic_integrate_funcs = [LIBS[k]["function"] for k in LIBS.keys()] + + +def capture_io_logs(func): + def wrap(*args, **kwargs): + sio_stdout = io.StringIO() + sio_stderr = io.StringIO() + cls = args[0] + if func.__name__ in dynamic_integrate_funcs: + args = args[1:] + + with redirect_stderr(sio_stderr) as stderr, redirect_stdout( + sio_stdout + ) as stdout: + try: + ret = func(*args, **kwargs) + except SystemExit as exp: + logging.error(exp.code) + ret = None + + cls.logs = "stdout logs: {}".format(stdout.getvalue()) + cls.logs += "\nstderr logs: {}".format(stderr.getvalue()) + return ret + + return wrap + + +def _load_method_from_file(name, file, func): + try: + _module = SourceFileLoader(name, file).load_module() + return getattr(_module, func) + except FileNotFoundError as e: + logger.error("Failed to import module from %s file", file) + logger.debug(e) + except AttributeError as e: + logger.error("Failed to get %s function from %s file", func, file) + logger.debug(e) + + +def append_method_to_service(cls): + for key, value in LIBS.items(): + func = _load_method_from_file(key, value["source"], value["function"]) + if func: + setattr(cls, key, capture_io_logs(func)) + + return cls + + +class RpycTestService(rpyc.Service): + + def __init__(self): + super().__init__() + self.logs = "" + + +def main(): + rpyc_service = append_method_to_service(RpycTestService) + + t = ThreadedServer( + rpyc_service, + port=60000, + logger=logger, + protocol_config={ + "allow_all_attrs": True, + "allow_exposed_attrs": False, + }, + ) + t.start() + + +if __name__ == "__main__": + main() diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py new file mode 100755 index 0000000000..d22af8d3b6 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py @@ -0,0 +1,94 @@ +import logging +import subprocess +import time + +from importlib import reload +from pathlib import Path +from checkbox_support.scripts import ( + run_watcher, + usb_read_write, +) + + +# Method for testing +def _get_ethernet_ifaces(): + return set([i.name for i in Path("/sys/class/net").iterdir()]) + + +def _get_serial_ifaces(): + return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) + + +def _device_node_detect(func, device_type=""): + starting_ifaces = func() + + attempts = 20 + while attempts > 0: + now_ifaces = func() + # check if something disappeared + if not starting_ifaces == now_ifaces & starting_ifaces: + raise SystemExit( + "Interface(s) disappeared: {}".format( + ", ".join(list(starting_ifaces - now_ifaces)) + ) + ) + new_ifaces = now_ifaces - starting_ifaces + if new_ifaces: + print() + print( + "New interface(s) detected: {}".format( + ", ".join(list(new_ifaces)) + ) + ) + return new_ifaces + time.sleep(1) + print(".", end="", flush=True) + attempts -= 1 + print() + raise SystemExit("Failed to detect new {} interface".format(device_type)) + + +def configure_local_network(interface, net_info): + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} down".format(interface), + shell=True, + text=True, + ) + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) + logging.info("Turn up the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} up".format(interface), + shell=True, + text=True, + ) + + +def serial_check(): + return _device_node_detect(_get_serial_ifaces, "serial") + + +def ethernet_check(): + return _device_node_detect(_get_ethernet_ifaces, "ethernet") + + +def usb_storage_test(usb_type): + logging.info("%s drive read/write testing on RPYC", usb_type) + usb_read_write.REPETITION_NUM = 2 + watcher = run_watcher.USBStorage(usb_type) + mounted_partition = watcher.run_insertion() + logging.info( + "%s drive been mounted to %s on RPYC", usb_type, mounted_partition + ) + watcher.run_storage(mounted_partition) + # usb_read_write been imported in run_watcher + # the temporary mount point been created while import it + # and the directory been removed in gen_random_file function + # thus, we need to reload it to create temporary mount point + # in every testing cycle + reload(usb_read_write) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py index 42bda9e548..91bb7c0e29 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py @@ -130,7 +130,18 @@ def generate_random_string(length): return "".join(random.choice(letters) for _ in range(length)) -def server_mode(ser: Serial) -> None: +def server_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +) -> None: """ Running as a server, it will be sniffing for received string. And it will send the same string out. @@ -138,6 +149,18 @@ def server_mode(ser: Serial) -> None: running on port /dev/ttyUSB0 as a server $ sudo ./serial_test.py /dev/ttyUSB0 --mode server --type USB """ + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) logging.info("Listening on port {} ...".format(ser.node)) while True: data = ser.recv() @@ -148,7 +171,18 @@ def server_mode(ser: Serial) -> None: logging.info("Listening on port {} ...".format(ser.node)) -def client_mode(ser: Serial, data_size: int = 1024): +def client_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +): """ Running as a clinet and it will sending out a string and wait the string send back from server. After receive the string, @@ -157,7 +191,19 @@ def client_mode(ser: Serial, data_size: int = 1024): running on port /dev/ttymxc1 as a client $ sudo ./serial_test.py /dev/ttymxc1 --mode client --type RS485 """ - random_string = generate_random_string(data_size) + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) + random_string = generate_random_string(datasize) ser.send(random_string.encode()) for i in range(1, 6): logging.info("Attempting receive string... {} time".format(i)) @@ -174,13 +220,36 @@ def client_mode(ser: Serial, data_size: int = 1024): raise SystemExit(1) -def console_mode(ser: Serial): +def console_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +): """ Test the serial port when it is in console mode This test requires DUT to loop back it self. For example: connect the serial console port to the USB port via serial to usb dongle """ + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) try: # Send 'Enter Key' logging.info("Sending 'Enter Key'...") @@ -329,25 +398,48 @@ def main(): "delay_before_tx": args.rts_delay_before_tx, "delay_before_rx": args.rts_delay_before_rx, } - ser = Serial( - args.node, - args.type, - args.group, - baudrate=args.baudrate, - bytesize=args.bytesize, - parity=args.parity, - stopbits=args.stopbits, - timeout=args.timeout, - data_size=args.datasize, - rs485_settings=rs485_settings, - ) + else: + rs485_settings = None if args.mode == "server": - server_mode(ser) + server_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) elif args.mode == "client": - client_mode(ser, data_size=args.datasize) + client_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) elif args.mode == "console": - console_mode(ser) + console_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) else: raise SystemExit(1) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu index 8aababc448..ae69d49229 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu @@ -24,15 +24,15 @@ id: ce-oem-otg/g_serial-{USB_port} estimated_duration: 60 plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend _summary: Check {USB_port} can be detected as a serial device -_purpose: +_purpose: Check that after connecting the device under test (DUT) to another device (host), {USB_port} can be detected as a serial device by the host. _steps: @@ -50,11 +50,11 @@ _steps: 7. Check the string sending from the host to {USB_port} by repeating steps 4-6, but swap the send and receive sides. _verification: Does string send and receive work? -command: +command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f acm @@ -62,14 +62,14 @@ unit: template template-resource: otg_ports template-unit: job category_id: com.canonical.plainbox::usb -id: ce-oem-otg/g_mass_storage-{USB_port} +id: ce-oem-otg/g_mass_storage-{USB_port} estimated_duration: 60 plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend @@ -79,16 +79,16 @@ _purpose: (host), {USB_port} can be detected as a mass storage device by the host. _steps: 1. Press Enter to setup a 16 MB FAT32 image on the device. - 2. From an Ubuntu host, connect a cable to the USB OTG port + 2. From an Ubuntu host, connect a cable to the USB OTG port (it's the USB connector) of the DUT. _verification: - The host detects and mounts a mass storage device. It has read and write + The host detects and mounts a mass storage device. It has read and write permissions on it. command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f mass_storage @@ -99,23 +99,23 @@ category_id: com.canonical.plainbox::usb id: ce-oem-otg/g_ether-{USB_port} plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend _summary: Check {USB_port} can be detected as USB ethernet device. -_purpose: +_purpose: Check that after connecting the device under test (DUT) to another device (host), {USB_port} can be detected as a USB ethernet device by the host. _steps: - 1. From an Ubuntu host, connect a cable to the USB OTG port + 1. From an Ubuntu host, connect a cable to the USB OTG port (it's the USB connector) of the {USB_port}. 2. Press Enter to config the IP address for the USB OTG ethernet interface on the DUT. (IP will be 192.168.9.1/24) - 3. On the host, config the IP address for the USB OTG ethernet interface. And it should be + 3. On the host, config the IP address for the USB OTG ethernet interface. And it should be the same subnet with {USB_port}. (eg. 192.168.9.2/24) 4. Try to ping between the host and {USB_port}. _verification: @@ -124,6 +124,128 @@ command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f ecm + + +id: otg_port_mapping +category_id: com.canonical.plainbox::usb +plugin: resource +_summary: Gather list of USB ports and UDC. +_description: + A USB port and UDC mapping resource that relies on the user specifying in config varirable. + Usage of parameter: USB_OTG_MAPPING={port1}:{udc_node1}:{usb_type1} {port2}:{udc_node2}:{usb_type2} ... + e.g. USB_OTG_MAPPING=USB-C1:11200000.usb:usb2 USB-Micro:112a1000.usb:usb3 +estimated_duration: 1s +environ: USB_OTG_MAPPING +flags: preserve-locale +user: root +command: + if [ "$USB_OTG_MAPPING" ]; then + multiple_otg.py info -c "$USB_OTG_MAPPING" + else + echo "USB_OTG_MAPPING config variable: not found" + fi + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-network-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a network device through OTG {{ USB_CONNETOR }} and can ping to RPYC Server +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then setup the USB network interface on both DUT and RPYC server and perform ping test on DUT +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Ethernet-on-port-{{ USB_CONNECTOR }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +command: + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t ethernet -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-serial-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a serial device through OTG {{ USB_CONNETOR }} and transfer data via OTG Serial +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then start serial server on RPYC server and perform data read/write test via serial interface +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Serial-on-port-{{ USB_CONNECTOR }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +command: + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t serial -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-mass-storage-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a mass storage device through OTG {{ USB_CONNETOR }} and be able to access +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + the USB interface will be detected as a mass storage and perform data read/write test +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Mass-Storage-on-port-{{ USB_CONNECTOR }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +command: + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t mass_storage -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu index 57ee1f2c65..75e651a687 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu @@ -1,4 +1,9 @@ unit: manifest entry id: has_otg _name: Does platfrom supported USB OTG? -value-type: bool \ No newline at end of file +value-type: bool + +unit: manifest entry +id: has_rpyc_otg_server +_name: Does RPYC server with OTG test services been setup? +value-type: bool diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu index f107fa9cca..417c7d6095 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu @@ -22,7 +22,12 @@ id: ce-oem-otg-automated unit: test plan _name: OTG auto tests _description: Automated OTG tests for devices +bootstrap_include: + otg_port_mapping include: + ce-oem-otg/otg-network-test + ce-oem-otg/otg-serial-test + ce-oem-otg/otg-mass-storage-test id: after-suspend-ce-oem-otg-manual unit: test plan