From 9b2057caa8bbfd417e56a9eec8093ee464869e20 Mon Sep 17 00:00:00 2001 From: stanley31 Date: Wed, 30 Oct 2024 12:32:51 +0100 Subject: [PATCH 01/17] add rpyc client add rpyc client --- .../bin/rpyc_client.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100755 contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py new file mode 100755 index 0000000000..ec4f6d8a87 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py @@ -0,0 +1,46 @@ +import time + +from importlib import import_module + + +def rpyc_client(host, cmd, *args, **kwargs): + """ + Run command on RPYC. + + :param host: RPYC server IP address + :param cmd: command to be executed + :param args: command arguments + :param kwargs: command keyword arguments + :returns: whatever is returned by RPYC service + :raises SystemExit: if the connection cannot be established + or the command is unknown + or a service error occurs + """ + try: + _rpyc = import_module("rpyc") + except ImportError: + try: + _rpyc = import_module("plainbox.vendor.rpyc") + except ImportError as exc: + msg = "RPyC not found. Neither from sys nor from Checkbox" + raise SystemExit(msg) from exc + + for _ in range(2): + try: + conn = _rpyc.connect(host, 60000, config={"allow_all_attrs": True}) + break + except ConnectionRefusedError: + time.sleep(1) + else: + raise SystemExit("Cannot connect to RPYC Host.") + + try: + return getattr(conn.root, cmd)(*args, **kwargs) + except AttributeError: + raise SystemExit( + "RPYC host does not provide a '{}' command.".format(cmd) + ) + except _rpyc.core.vinegar.GenericException as exc: + raise SystemExit( + "Zapper host failed to process the requested command." + ) from exc \ No newline at end of file From c386b395207c70b3c7bc2abd3ee1280ffd7863f9 Mon Sep 17 00:00:00 2001 From: stanley31 Date: Thu, 31 Oct 2024 11:39:21 +0100 Subject: [PATCH 02/17] add otg test python script add otg test python script --- .../bin/multiple_otg.py | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100755 contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py new file mode 100755 index 0000000000..2fb66b72fe --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -0,0 +1,97 @@ +import logging +import os +import subprocess + +from contextlib import contextmanager +from pathlib import Path + + +MODULE_MAPPING = { + "usb": "usb_f_mass_storage", + "ethernet": "usb_f_ecm", + "serial": "usb_f_acm", +} +OTG_MODULE = "libcomposite" +GADGET_PATH = "/sys/kernel/config/usb_gadget" + + +def _get_otg_module(): + ret = subprocess.run( + "lsmod | grep {}".format(OTG_MODULE), + shell=True, + universal_newlines=True + ) + return ret.stdout.strip() + + +def enable_otg_module(): + disable_otg_related_modules() + + if not _get_otg_module(): + subprocess.run("modprode {}".format(OTG_MODULE)) + + +def disable_otg_related_modules(): + ret = subprocess.run("lsmod | awk '/^libcomposite/ {print $4}'") + if ret.returncode == 0: + for module in ret.stdout.split(","): + subprocess.run("modprobe -r {}".format(module)) + + +def _initial_gadget(): + logging.info("initial gadget") + enable_otg_module() + + ret = subprocess.run("mount | configfs") + if not ret.stdout.strip(): + subprocess.run( + "mount -t configfs none {}".format(os.path.split(GADGET_PATH)) + ) + +def _create_otg_configs(): + logging.info("create gadget") + path_g1 = os.path.join(GADGET_PATH, "g1") + os.makedirs(path_g1) + + path_lang = os.path.join(path_g1, "strings", "0x409") # english lang + os.makedirs(path_lang) + + path_g1 = Path(path_g1) + vid_file = path_g1.joinpath("idVendor") + vid_file.write_text("0xabcd") + pid_file = path_g1.joinpath("idProduct") + pid_file.write_text("0x9999") + + # create configs + path_config = path_g1.joinpath("configs", "c.1") + os.makedirs(path_config) + + max_power_file = path_config.joinpath("MaxPower") + max_power_file.write_text("120") + + +def otg_testing(method): + pass + + +def teardown(): + pass + +@contextmanager +def prepare_env(): + try: + _initial_gadget() + _create_otg_configs() + except Exception as err: + logging.error(err) + finally: + teardown() + + +def main(): + with prepare_env(): + otg_testing() + + +if __name__ == "__main__": + main() \ No newline at end of file From 996cda6d676aa9eecec81202557f67143d28a291 Mon Sep 17 00:00:00 2001 From: stanley31 Date: Thu, 31 Oct 2024 12:16:46 +0100 Subject: [PATCH 03/17] update main function update main function --- .../bin/multiple_otg.py | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 2fb66b72fe..6a044fad91 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -1,5 +1,7 @@ +import argparse import logging import os +import shutil import subprocess from contextlib import contextmanager @@ -70,12 +72,12 @@ def _create_otg_configs(): max_power_file.write_text("120") -def otg_testing(method): - pass +def teardown(): + path_obj = Path(GADGET_PATH).joinpath("g1", "UDC") + path_obj.write_text("") + shutil.rmtree(GADGET_PATH) -def teardown(): - pass @contextmanager def prepare_env(): @@ -88,9 +90,37 @@ def prepare_env(): teardown() +class OtgTest(): + + def mass_storage_test(self): + pass + + def ethernet_test(self): + pass + + def serial_test(self): + pass + + +def register_arguments(): + parser = argparse.ArgumentParser( + description="OTG test method" + ) + + parser.add_argument( + "-t", + "--type", + required=True, + choices=["mass_storage", "ethernet", "serial"] + ) + + return parser.parse_args() + + def main(): + args = register_arguments() with prepare_env(): - otg_testing() + getattr(OtgTest, args.type)(args) if __name__ == "__main__": From 54fd541b583ac7def22729251b6a48d39e69727e Mon Sep 17 00:00:00 2001 From: stanley31 Date: Thu, 31 Oct 2024 12:30:10 +0100 Subject: [PATCH 04/17] update test method update test method --- .../bin/multiple_otg.py | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 6a044fad91..e6cb6b7df7 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -90,15 +90,22 @@ def prepare_env(): teardown() +def dump_otg_info(configs): + pass + + class OtgTest(): - def mass_storage_test(self): + def info(self): + pass + + def mass_storage(self): pass - def ethernet_test(self): + def ethernet(self): pass - def serial_test(self): + def serial(self): pass @@ -107,12 +114,21 @@ def register_arguments(): description="OTG test method" ) - parser.add_argument( + sub_parser = parser.add_subparsers( + dest="mode", + required=True, + ) + test_parser = sub_parser.add_parser("test") + test_parser.add_argument( "-t", "--type", required=True, choices=["mass_storage", "ethernet", "serial"] ) + test_parser.add_argument("-a", "--address", required=True, type=str) + + info_parser = sub_parser.add_parser("info") + info_parser.add_argument("-c", "--config", required=True, type=str) return parser.parse_args() @@ -120,7 +136,10 @@ def register_arguments(): def main(): args = register_arguments() with prepare_env(): - getattr(OtgTest, args.type)(args) + if args.mode == "test": + getattr(OtgTest, args.type)(args) + elif args.mode == "info": + dump_otg_info(args.config) if __name__ == "__main__": From 4881523515a2b961ea715ede276dac2bc516bb89 Mon Sep 17 00:00:00 2001 From: weichenwu Date: Thu, 31 Oct 2024 15:10:05 +0100 Subject: [PATCH 05/17] add create otg function --- .../bin/multiple_otg.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index e6cb6b7df7..52254cc8f5 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -3,6 +3,7 @@ import os import shutil import subprocess +import tempfile from contextlib import contextmanager from pathlib import Path @@ -72,6 +73,39 @@ def _create_otg_configs(): max_power_file.write_text("120") +def _create_function(function): + logging.info("create function") + subprocess.run("modprobe usb_f_{}".format(function)) + function_path = os.path.join( + GADGET_PATH, + "g1", + "functions", + "{}.0".format(function) + ) + config_path = os.path.join(GADGET_PATH, "g1", "configs", "c.1") + + if not os.path.isdir(function_path): + os.makedirs(function_path) + + if function == "mass_storage": + with tempfile.TemporaryDirectory() as tmp_dir: + img = os.path.join(tmp_dir, "lun0.img") + subprocess.run("dd if=/dev/zero of={} bs=1M count=16".format(img)) + subprocess.run("mkdosfs -F 32 {}".format(img)) + + with open(os.path.join(function_path, "lun.0", "file"), "w") as f: + f.write(img) + + os.symlink( + function_path, + os.path.join(config_path, "{}.0".format(function)) + ) + + +def otg_testing(method): + pass + + def teardown(): path_obj = Path(GADGET_PATH).joinpath("g1", "UDC") path_obj.write_text("") @@ -79,11 +113,13 @@ def teardown(): shutil.rmtree(GADGET_PATH) + @contextmanager def prepare_env(): try: _initial_gadget() _create_otg_configs() + _create_function() except Exception as err: logging.error(err) finally: From 3ed70485e09cb8ac19e958aa993994e266f1d9c9 Mon Sep 17 00:00:00 2001 From: stanley31 Date: Fri, 1 Nov 2024 10:44:13 +0100 Subject: [PATCH 06/17] update arguments update arguments --- .../bin/multiple_otg.py | 95 +++++++++++-------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 52254cc8f5..1630e21b55 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -1,6 +1,8 @@ import argparse +import glob import logging import os +import re import shutil import subprocess import tempfile @@ -16,6 +18,9 @@ } OTG_MODULE = "libcomposite" GADGET_PATH = "/sys/kernel/config/usb_gadget" +UDC_G1_NODE = Path(GADGET_PATH).joinpath("g1") +UDC_CONFIG = UDC_G1_NODE.joinpath("configs", "c.1") +UDC_NODE = UDC_G1_NODE.joinpath("UDC") def _get_otg_module(): @@ -51,38 +56,29 @@ def _initial_gadget(): "mount -t configfs none {}".format(os.path.split(GADGET_PATH)) ) + def _create_otg_configs(): logging.info("create gadget") - path_g1 = os.path.join(GADGET_PATH, "g1") - os.makedirs(path_g1) + os.makedirs(UDC_G1_NODE.name) - path_lang = os.path.join(path_g1, "strings", "0x409") # english lang - os.makedirs(path_lang) + path_lang = UDC_G1_NODE.joinpath("strings", "0x409") + os.makedirs(path_lang.name) # english language - path_g1 = Path(path_g1) - vid_file = path_g1.joinpath("idVendor") + vid_file = UDC_G1_NODE.joinpath("idVendor") vid_file.write_text("0xabcd") - pid_file = path_g1.joinpath("idProduct") + pid_file = UDC_G1_NODE.joinpath("idProduct") pid_file.write_text("0x9999") # create configs - path_config = path_g1.joinpath("configs", "c.1") - os.makedirs(path_config) - - max_power_file = path_config.joinpath("MaxPower") + os.makedirs(UDC_CONFIG.name) + max_power_file = UDC_CONFIG.joinpath("MaxPower") max_power_file.write_text("120") def _create_function(function): logging.info("create function") subprocess.run("modprobe usb_f_{}".format(function)) - function_path = os.path.join( - GADGET_PATH, - "g1", - "functions", - "{}.0".format(function) - ) - config_path = os.path.join(GADGET_PATH, "g1", "configs", "c.1") + function_path = UDC_G1_NODE.joinpath("functions", "{}.0".format(function)) if not os.path.isdir(function_path): os.makedirs(function_path) @@ -98,7 +94,7 @@ def _create_function(function): os.symlink( function_path, - os.path.join(config_path, "{}.0".format(function)) + UDC_CONFIG.joinpath("{}.0".format(function)).name ) @@ -107,41 +103,66 @@ def otg_testing(method): def teardown(): - path_obj = Path(GADGET_PATH).joinpath("g1", "UDC") - path_obj.write_text("") - + UDC_NODE.write_text("") shutil.rmtree(GADGET_PATH) - @contextmanager -def prepare_env(): +def prepare_env(mode, address): try: _initial_gadget() _create_otg_configs() - _create_function() + _create_function(mode) + # Activate OTG + UDC_NODE.write_text(address) except Exception as err: logging.error(err) finally: teardown() +def _identify_udc_bus(otg_bus, udc_list): + for udc in udc_list: + if udc in otg_bus: + return udc + elif glob.glob( + "/sys/devices/platform/**/{}/{}*".format(udc, otg_bus) + ): + return udc + return "None" + + def dump_otg_info(configs): - pass + otg_nodes = glob.glob( + "/sys/firmware/devicetree/base/**/dr_mode", recursive=True + ) + udc_list = [os.path.basename(f) for f in glob.glob("/sys/class/udc/*")] + otg_mapping = {} + for node in otg_nodes: + mode = Path(node).read_text().strip() + usb_bus = re.search(r"usb@([a-z0-9]*)", node) + otg_mapping[usb_bus] = mode + + for config in configs.split(): + otg_conf = config.split(":") + if len(otg_conf) == 2: + udc_bus = _identify_udc_bus(otg_conf[1], udc_list) + print("USB_port: {}".format(otg_conf[0])) + print("USB_node: {}".format(otg_conf[1])) + print("Mode: {}".format(otg_mapping.get(config[0], ""))) + print("UDC: {}".format(udc_bus)) + print() class OtgTest(): - def info(self): - pass - - def mass_storage(self): + def mass_storage(self, type, address): pass - def ethernet(self): + def ethernet(self, type, address): pass - def serial(self): + def serial(self, type, address): pass @@ -171,11 +192,11 @@ def register_arguments(): def main(): args = register_arguments() - with prepare_env(): - if args.mode == "test": - getattr(OtgTest, args.type)(args) - elif args.mode == "info": - dump_otg_info(args.config) + if args.mode == "test": + with prepare_env(): + getattr(OtgTest, args.type)(args.type, args.address) + elif args.mode == "info": + dump_otg_info(args.config) if __name__ == "__main__": From ed7d0aed464c4e69e0910de101005d007025f9cd Mon Sep 17 00:00:00 2001 From: stanley31 Date: Thu, 31 Oct 2024 15:32:33 +0100 Subject: [PATCH 07/17] update arguments update arguments --- .../bin/multiple_otg.py | 149 +++++++++++++++--- 1 file changed, 128 insertions(+), 21 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 1630e21b55..4cf23309bf 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -9,6 +9,7 @@ from contextlib import contextmanager from pathlib import Path +from rpyc_client import rpyc_client MODULE_MAPPING = { @@ -102,25 +103,6 @@ def otg_testing(method): pass -def teardown(): - UDC_NODE.write_text("") - shutil.rmtree(GADGET_PATH) - - -@contextmanager -def prepare_env(mode, address): - try: - _initial_gadget() - _create_otg_configs() - _create_function(mode) - # Activate OTG - UDC_NODE.write_text(address) - except Exception as err: - logging.error(err) - finally: - teardown() - - def _identify_udc_bus(otg_bus, udc_list): for udc in udc_list: if udc in otg_bus: @@ -154,11 +136,136 @@ def dump_otg_info(configs): print() -class OtgTest(): +class ConfigFsOperator(tempfile.TemporaryDirectory): + + def __enter__(self): + subprocess.run( + "mount -t configfs none {}".format(self.name), + shell=True, + check=True + ) + super.__init__() + + def __exit__(self, exc, value, tb): + subprocess.run("umount {}".format(self.name)) + super.__exit__(exc, value, tb) + + def create_otg_gadget(self): + gadget_root = Path(self.name, "usb_gadget") + gadget_root.mkdir() + self.gadget_node = gadget_root.joinpath("g1") + self.gadget_node.mkdir() + + # create PID and VID file + self.gadget_node.joinpath("idVendor").write_text("0xabcd") + self.gadget_node.joinpath("idProduct").write_text("0x9999") + + # create serial no, manufacture and product + string_dir = self.gadget_node.joinpath("strings") + string_dir.mkdir() + lang_dir = string_dir.joinpath("0x409") + lang_dir.mkdir() + lang_dir.joinpath("serialnumber").write_text("1234567") + lang_dir.joinpath("manufacturer").write_text("canonical") + lang_dir.joinpath("product").write_text("otg_device") + + def create_otg_config(self): + config_root = self.gadget_node.joinpath("configs") + config_root.mkdir() + config_node = config_root.joinpath("c.1") + config_node.mkdir() + + config_node.joinpath("MaxPower").write_text("120") + string_dir = config_node.joinpath("strings") + string_dir.mkdir() + lang_dir = string_dir.joinpath("0x409") + lang_dir.mkdir() + lang_dir.joinpath("configuration").write_text("otg") + + def create_otg_function(self): + pass + +class OtgTestBase(): + """ + This is a object to setup the USB gadget to support different OTG scenario + Reference https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt + """ + def __init__(self, bus_addr): + self._addr = bus_addr + + def _get_related_libcomposite_modules(self): + ret = subprocess.run("lsmod | awk '/^libcomposite/ {print $4}'") + if ret.returncode == 0: + return ret.stdout.split(",") + return [] + + def _enable_libcomposite_module(self): + modules = self._get_related_libcomposite_modules() + if modules: + # libcomposite module has been loaded, unload corresponding module + for module in modules: + subprocess.run("modprobe -r {}".format(module), check=True) + else: + # load libcomposite + subprocess.run("modprobe {}".format(module), check=True) + + def _identify_configfs_dir(self): + + + def _pre_setup_env(self): + self._enable_libcomposite_module() + + def create_gadget(self): + pass + + def create_config(self): + pass + + def create_function(self): + pass - def mass_storage(self, type, address): + def associate_function_and_config(self): pass + def enable_gadget(self): + pass + + def disable_gadget(self): + pass + + def clean_up(self): + pass + + +class OtgTest(): + + def __init__(self, mode, address): + self._mode = mode + self._address = address + + def __enter__(self): + self._prepare_env() + + def __exit__(self): + try: + UDC_NODE.write_text("") + shutil.rmtree(GADGET_PATH) + except Exception as err: + logging.error(err) + + def _prepare_env(self): + _initial_gadget() + _create_otg_configs() + _create_function(self._mode) + + def activate_otg(self): + # Activate OTG + UDC_NODE.write_text(self._address) + + @classmethod + def mass_storage(cls, type, address): + rpyc_client() + def ethernet(self, type, address): pass From 1cb996fa5fc8b8e21b4c7215e719369c04488d97 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Tue, 24 Dec 2024 17:14:05 +0800 Subject: [PATCH 08/17] refactor the OTG testing script refactor the OTG testing script --- .../bin/multiple_otg.py | 514 ++++++++++-------- 1 file changed, 296 insertions(+), 218 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 4cf23309bf..40b5cc1799 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -2,281 +2,359 @@ import glob import logging import os -import re -import shutil import subprocess import tempfile +import time -from contextlib import contextmanager +from importlib import import_module +from multiprocessing import Process from pathlib import Path from rpyc_client import rpyc_client +from typing import Union - -MODULE_MAPPING = { - "usb": "usb_f_mass_storage", - "ethernet": "usb_f_ecm", - "serial": "usb_f_acm", -} OTG_MODULE = "libcomposite" -GADGET_PATH = "/sys/kernel/config/usb_gadget" -UDC_G1_NODE = Path(GADGET_PATH).joinpath("g1") -UDC_CONFIG = UDC_G1_NODE.joinpath("configs", "c.1") -UDC_NODE = UDC_G1_NODE.joinpath("UDC") +logging.basicConfig(level=logging.DEBUG) -def _get_otg_module(): - ret = subprocess.run( - "lsmod | grep {}".format(OTG_MODULE), - shell=True, - universal_newlines=True - ) - return ret.stdout.strip() +def initial_configfs() -> Union[tempfile.TemporaryDirectory, Path]: + """ + return a Path object with current mount point + when the kernel configfs has been mounted + Or return a TemporaryDirectory object and mount it as a kernel configfs -def enable_otg_module(): - disable_otg_related_modules() - - if not _get_otg_module(): - subprocess.run("modprode {}".format(OTG_MODULE)) + Returns: + Union[tempfile.TemporaryDirectory, Path]: kernel configfs directory + """ + logging.info("initialize configfs") + ret = subprocess.check_output("mount -t configfs", shell=True, text=True) + if ret.strip(): + configfs_dir = Path(ret.split()[2]) + logging.info("kernel configfs has been mounted on %s", configfs_dir) + else: + configfs_dir = Path(tempfile.NamedTemporaryDirectory().name) + subprocess.run("mount -t configfs none {}".format(configfs_dir.name)) + logging.info("mount configfs on %s", configfs_dir.name) + return configfs_dir -def disable_otg_related_modules(): - ret = subprocess.run("lsmod | awk '/^libcomposite/ {print $4}'") - if ret.returncode == 0: - for module in ret.stdout.split(","): - subprocess.run("modprobe -r {}".format(module)) +def dump_otg_info(configs): + for config in configs.split(): + otg_conf = config.split(":") + if len(otg_conf) == 2: + print("USB_CONNECTOR: {}".format(otg_conf[0])) + print("USB_BUS: {}".format(otg_conf[1])) + print() -def _initial_gadget(): - logging.info("initial gadget") - enable_otg_module() +class OtgConfigFsOperatorBase: + """ + This is a object to setup the USB gadget to support different OTG scenario + Reference https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt + """ - ret = subprocess.run("mount | configfs") - if not ret.stdout.strip(): - subprocess.run( - "mount -t configfs none {}".format(os.path.split(GADGET_PATH)) - ) + OTG_FUNCTION = "" + OTG_TARGET_MODULE = "" + def __init__(self, root_path: Path, udc_path: str): + self._child_modules = self._get_child_modules() + self.root_path = root_path + self.usb_gadget_node = None + self.udc_node = Path("/sys/class/udc").joinpath(udc_path) -def _create_otg_configs(): - logging.info("create gadget") - os.makedirs(UDC_G1_NODE.name) + def __enter__(self): + logging.debug("enter setup function") + if self._child_modules: + logging.info(self._child_modules) + # To clean up the OTG modules + self.disable_otg_related_modules(self._child_modules) + self.enable_otg_module([OTG_MODULE]) + self.usb_gadget_node = Path( + tempfile.TemporaryDirectory( + dir=self.root_path.joinpath("usb_gadget"), + prefix="udc_" + ).name + ) + self.otg_setup() + self.create_otg_configs() + self.create_otg_function() + return self + + def __exit__(self, exec, value, tb): + logging.debug("enter teardown function") + self._cleanup_usb_gadget() + cur_modules = [ + mod for mod in self._get_child_modules() if mod not in self._child_modules] + self.disable_otg_related_modules(cur_modules) + if self._child_modules: + self.enable_otg_module(self._child_modules) + self.otg_teardown() + + def _get_child_modules(self): + output = subprocess.check_output( + "lsmod | awk '/^libcomposite/ {print $4}'", + shell=True, + text=True, + universal_newlines=True, + ) + return output.strip("\n").split(",") if output.strip("\n") else [] + + def enable_otg_module(self, modules): + for module in modules: + subprocess.run("modprobe {}".format(module), shell=True, check=True) + + def disable_otg_related_modules(self, modules): + for module in modules: + subprocess.run("modprobe -r {}".format(module), shell=True, check=True) + + def otg_setup(self): + """ + This is function for doing any extra step for specific OTG function + such as collecting ethernet interface, serial interface + and create an USB image file + """ + pass - path_lang = UDC_G1_NODE.joinpath("strings", "0x409") - os.makedirs(path_lang.name) # english language + def otg_teardown(self): + """ + This is function for doing any extra step for specific OTG function + such as delete an USB image file + """ + pass - vid_file = UDC_G1_NODE.joinpath("idVendor") - vid_file.write_text("0xabcd") - pid_file = UDC_G1_NODE.joinpath("idProduct") - pid_file.write_text("0x9999") + def create_otg_configs(self): + logging.info("create USB gadget") + path_lang = self.usb_gadget_node.joinpath("strings", "0x409") + path_lang.mkdir() - # create configs - os.makedirs(UDC_CONFIG.name) - max_power_file = UDC_CONFIG.joinpath("MaxPower") - max_power_file.write_text("120") + vid_file = self.usb_gadget_node.joinpath("idVendor") + vid_file.write_text("0xabcd") + pid_file = self.usb_gadget_node.joinpath("idProduct") + pid_file.write_text("0x9999") + # create configs + udc_configs = self.usb_gadget_node.joinpath("configs", "c.1") + udc_configs.mkdir() + max_power_file = udc_configs.joinpath("MaxPower") + max_power_file.write_text("120") -def _create_function(function): - logging.info("create function") - subprocess.run("modprobe usb_f_{}".format(function)) - function_path = UDC_G1_NODE.joinpath("functions", "{}.0".format(function)) + def create_otg_function(self): + logging.info("create function") + self.enable_otg_module([self.OTG_TARGET_MODULE]) + func_name = "{}.0".format(self.OTG_FUNCTION) + function_path = self.usb_gadget_node.joinpath("functions", func_name) + + if not function_path.exists(): + function_path.mkdir() + + self.usb_gadget_node.joinpath( + "configs", "c.1", func_name).symlink_to(function_path, True) + + def _cleanup_usb_gadget(self): + func_name = "{}.0".format(self.OTG_FUNCTION) + self.usb_gadget_node.joinpath("strings", "0x409").rmdir() + self.usb_gadget_node.joinpath("configs", "c.1", func_name).unlink(True) + self.usb_gadget_node.joinpath("configs", "c.1").rmdir() + self.usb_gadget_node.joinpath("functions", func_name).rmdir() + self.usb_gadget_node.rmdir() + + def enable_otg(self): + if self.udc_node.exists(): + self.usb_gadget_node.joinpath("UDC").write_text(self.udc_node.name) + else: + logging.error("UDC node '%s' not exists", self.udc_node) + raise ValueError(self.udc_node) - if not os.path.isdir(function_path): - os.makedirs(function_path) + def disable_otg(self): + self.usb_gadget_node.joinpath("UDC").write_text("") - if function == "mass_storage": - with tempfile.TemporaryDirectory() as tmp_dir: - img = os.path.join(tmp_dir, "lun0.img") - subprocess.run("dd if=/dev/zero of={} bs=1M count=16".format(img)) - subprocess.run("mkdosfs -F 32 {}".format(img)) + def self_check(self): + """ensure the USB device been generated. - with open(os.path.join(function_path, "lun.0", "file"), "w") as f: - f.write(img) + Returns: + bool: return True when a USB device been detected + """ + logging.debug("check") + return True - os.symlink( - function_path, - UDC_CONFIG.joinpath("{}.0".format(function)).name - ) + def detection_check_on_rpyc(self, rpyc_ip): + """ + This is a function to detect OTG device on client + """ + pass + def function_check_on_rpyc(self, rpyc_ip): + """ + this is a function to perform OTG testing on client + """ + pass -def otg_testing(method): - pass +class OtgMassStorageSetup(OtgConfigFsOperatorBase): -def _identify_udc_bus(otg_bus, udc_list): - for udc in udc_list: - if udc in otg_bus: - return udc - elif glob.glob( - "/sys/devices/platform/**/{}/{}*".format(udc, otg_bus) - ): - return udc - return "None" + OTG_FUNCTION = "mass_storage" + OTG_TARGET_MODULE = "usb_f_mass_storage" + def otg_setup(self): + """ + This is function for doing any extra step for specific OTG function + such as collecting ethernet interface, serial interface + and create an USB storage + """ + self._usb_img = tempfile.NamedTemporaryFile("+bw", delete=False) + subprocess.run( + "dd if=/dev/zero of={} bs=1M count=16".format(self._usb_img.name), + shell=True, + check=True, + ) + subprocess.run( + "mkdosfs -F 32 {}".format(self._usb_img.name), + shell=True, + check=True, + ) + logging.info("Create an USB image file to %s", self._usb_img.name) -def dump_otg_info(configs): - otg_nodes = glob.glob( - "/sys/firmware/devicetree/base/**/dr_mode", recursive=True - ) - udc_list = [os.path.basename(f) for f in glob.glob("/sys/class/udc/*")] - otg_mapping = {} - for node in otg_nodes: - mode = Path(node).read_text().strip() - usb_bus = re.search(r"usb@([a-z0-9]*)", node) - otg_mapping[usb_bus] = mode + def otg_teardown(self): + logging.info("Delete USB image file from %s", self._usb_img.name) + os.remove(self._usb_img.name) - for config in configs.split(): - otg_conf = config.split(":") - if len(otg_conf) == 2: - udc_bus = _identify_udc_bus(otg_conf[1], udc_list) - print("USB_port: {}".format(otg_conf[0])) - print("USB_node: {}".format(otg_conf[1])) - print("Mode: {}".format(otg_mapping.get(config[0], ""))) - print("UDC: {}".format(udc_bus)) - print() + def create_otg_function(self): + logging.info("create function") + self.enable_otg_module([self.OTG_TARGET_MODULE]) + func_name = "{}.0".format(self.OTG_FUNCTION) + function_path = self.usb_gadget_node.joinpath("functions", func_name) + if not function_path.exists(): + function_path.mkdir() -class ConfigFsOperator(tempfile.TemporaryDirectory): + function_path.joinpath("lun.0", "file").write_text(self._usb_img.name) - def __enter__(self): - subprocess.run( - "mount -t configfs none {}".format(self.name), - shell=True, - check=True - ) - super.__init__() - - def __exit__(self, exc, value, tb): - subprocess.run("umount {}".format(self.name)) - super.__exit__(exc, value, tb) - - def create_otg_gadget(self): - gadget_root = Path(self.name, "usb_gadget") - gadget_root.mkdir() - self.gadget_node = gadget_root.joinpath("g1") - self.gadget_node.mkdir() - - # create PID and VID file - self.gadget_node.joinpath("idVendor").write_text("0xabcd") - self.gadget_node.joinpath("idProduct").write_text("0x9999") - - # create serial no, manufacture and product - string_dir = self.gadget_node.joinpath("strings") - string_dir.mkdir() - lang_dir = string_dir.joinpath("0x409") - lang_dir.mkdir() - lang_dir.joinpath("serialnumber").write_text("1234567") - lang_dir.joinpath("manufacturer").write_text("canonical") - lang_dir.joinpath("product").write_text("otg_device") - - def create_otg_config(self): - config_root = self.gadget_node.joinpath("configs") - config_root.mkdir() - config_node = config_root.joinpath("c.1") - config_node.mkdir() - - config_node.joinpath("MaxPower").write_text("120") - string_dir = config_node.joinpath("strings") - string_dir.mkdir() - lang_dir = string_dir.joinpath("0x409") - lang_dir.mkdir() - lang_dir.joinpath("configuration").write_text("otg") + self.usb_gadget_node.joinpath( + "configs", "c.1", func_name).symlink_to(function_path, True) - def create_otg_function(self): - pass + def self_check(self): + time.sleep(10) -class OtgTestBase(): - """ - This is a object to setup the USB gadget to support different OTG scenario - Reference https://www.kernel.org/doc/Documentation/usb/gadget_configfs.txt - """ - def __init__(self, bus_addr): - self._addr = bus_addr - - def _get_related_libcomposite_modules(self): - ret = subprocess.run("lsmod | awk '/^libcomposite/ {print $4}'") - if ret.returncode == 0: - return ret.stdout.split(",") - return [] - - def _enable_libcomposite_module(self): - modules = self._get_related_libcomposite_modules() - if modules: - # libcomposite module has been loaded, unload corresponding module - for module in modules: - subprocess.run("modprobe -r {}".format(module), check=True) - else: - # load libcomposite - subprocess.run("modprobe {}".format(module), check=True) - def _identify_configfs_dir(self): +class OtgEthernetSetup(OtgConfigFsOperatorBase): + OTG_FUNCTION = "ecm" + OTG_TARGET_MODULE = "usb_f_ecm" - def _pre_setup_env(self): - self._enable_libcomposite_module() + def _collect_net_intfs(self): + return [os.path.basename(intf) for intf in glob.glob("/sys/class/net/*")] - def create_gadget(self): - pass + def otg_setup(self): + self._net_intfs = self._collect_net_intfs() - def create_config(self): - pass + def self_check(self): + """ + Ensure the ethernet device been generated by usb gadget - def create_function(self): - pass + Returns: + bool: Return True when an USB Ethernet interface been detected + """ + logging.info("Validate a new network interface been generated") + cur_net_intfs = self._collect_net_intfs() + if len(cur_net_intfs) == len(self._net_intfs): + raise RuntimeError("OTG network interface not available") - def associate_function_and_config(self): - pass + otg_net_intf = [x for x in cur_net_intfs if x not in self._net_intfs] + if len(otg_net_intf) != 1: + logging.error("Found more than one new interface. %s", otg_net_intf) + else: + logging.info("Found new network interface '%s'", otg_net_intf[0]) + self._net_dev = otg_net_intf[0] - def enable_gadget(self): - pass + def detection_check_on_rpyc(self, rpyc_ip): + ret = rpyc_client(rpyc_ip, "exposed_net_check") + if ret: + logging.info("Found %s network interface on rpyc server", ret) + else: + logging.debug("No network interface found on rpyc server") + self._target_net_dev = ret - def disable_gadget(self): - pass + def _configure_local_network(self, interface, net_info): + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) - def clean_up(self): - pass + def function_check_on_rpyc(self, rpyc_ip): + self._configure_local_network(self._net_dev, "169.254.0.1/24") + rpyc_client( + rpyc_ip, + "exposed_configure_local_network", + self._target_net_dev, + "169.254.0.10/24", + ) + ret = rpyc_client(rpyc_ip, "exposed_network_ping", "169.254.0.1", self._target_net_dev) + logging.debug(ret) -class OtgTest(): +class OtgSerialSetup(OtgConfigFsOperatorBase): - def __init__(self, mode, address): - self._mode = mode - self._address = address + OTG_FUNCTION = "acm" + OTG_TARGET_MODULE = "usb_f_acm" - def __enter__(self): - self._prepare_env() + def _collect_serial_intfs(self): + return [os.path.basename(intf) for intf in glob.glob("/dev/ttyGS*")] - def __exit__(self): - try: - UDC_NODE.write_text("") - shutil.rmtree(GADGET_PATH) - except Exception as err: - logging.error(err) + def otg_setup(self): + self._ser_intfs = self._collect_serial_intfs() - def _prepare_env(self): - _initial_gadget() - _create_otg_configs() - _create_function(self._mode) + def self_check(self): + """ + Ensure a Serial device been generated by usb gadget - def activate_otg(self): - # Activate OTG - UDC_NODE.write_text(self._address) + Returns: + bool: Return True when a Serial interface been detected + """ + logging.info("Validate a new serial interface been generated") + cur_ser_intfs = self._collect_serial_intfs() + if len(cur_ser_intfs) == len(self._ser_intfs): + raise RuntimeError("OTG network interface not available") - @classmethod - def mass_storage(cls, type, address): - rpyc_client() + otg_ser_intf = [x for x in cur_ser_intfs if x not in self._ser_intfs] + if len(otg_ser_intf) != 1: + logging.error("Found more than one new interface. %s", otg_ser_intf) + else: + logging.info("Found new network interface '%s'", otg_ser_intf[0]) + return otg_ser_intf[0] + + +OTG_TESTING_MAPPING = { + "mass_storage": { + "detection": "", + "function": "", + "setup": OtgMassStorageSetup, + }, + "ethernet": { + "detection": "", + "function": "", + "setup": OtgEthernetSetup, + }, + "serial": { + "detection": "", + "function": "", + "setup": OtgSerialSetup, + }, +} - def ethernet(self, type, address): - pass - def serial(self, type, address): - pass +def otg_testing(udc_node, test_func, rpyc_ip): + configfs_dir = initial_configfs() + with OTG_TESTING_MAPPING[test_func]["setup"](configfs_dir, udc_node) as otg_cfg: + otg_cfg.enable_otg() + otg_cfg.self_check() + otg_cfg.detection_check_on_rpyc(rpyc_ip) + otg_cfg.function_check_on_rpyc(rpyc_ip) + otg_cfg.disable_otg() def register_arguments(): - parser = argparse.ArgumentParser( - description="OTG test method" - ) + parser = argparse.ArgumentParser(description="OTG test method") sub_parser = parser.add_subparsers( dest="mode", @@ -287,9 +365,10 @@ def register_arguments(): "-t", "--type", required=True, - choices=["mass_storage", "ethernet", "serial"] + choices=["mass_storage", "ethernet", "serial"], ) - test_parser.add_argument("-a", "--address", required=True, type=str) + test_parser.add_argument("-u", "--udc-node", required=True, type=str) + test_parser.add_argument("--rpyc-address", required=True, type=str) info_parser = sub_parser.add_parser("info") info_parser.add_argument("-c", "--config", required=True, type=str) @@ -300,11 +379,10 @@ def register_arguments(): def main(): args = register_arguments() if args.mode == "test": - with prepare_env(): - getattr(OtgTest, args.type)(args.type, args.address) + otg_testing(args.udc_node, args.type, args.rpyc_address) elif args.mode == "info": dump_otg_info(args.config) if __name__ == "__main__": - main() \ No newline at end of file + main() From 177960347db5712517ce38ec14926dc371113dfe Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Wed, 25 Dec 2024 17:05:26 +0800 Subject: [PATCH 09/17] add serial testing add serial testing --- .../bin/multiple_otg.py | 115 ++++++++++++++---- 1 file changed, 91 insertions(+), 24 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 40b5cc1799..a98ab615a7 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -10,6 +10,7 @@ from multiprocessing import Process from pathlib import Path from rpyc_client import rpyc_client +from threading import Thread from typing import Union OTG_MODULE = "libcomposite" @@ -183,7 +184,7 @@ def detection_check_on_rpyc(self, rpyc_ip): """ pass - def function_check_on_rpyc(self, rpyc_ip): + def function_check_with_rpyc(self, rpyc_ip): """ this is a function to perform OTG testing on client """ @@ -267,12 +268,12 @@ def self_check(self): self._net_dev = otg_net_intf[0] def detection_check_on_rpyc(self, rpyc_ip): - ret = rpyc_client(rpyc_ip, "exposed_net_check") + ret = rpyc_client(rpyc_ip, "net_check") if ret: logging.info("Found %s network interface on rpyc server", ret) else: logging.debug("No network interface found on rpyc server") - self._target_net_dev = ret + self._target_net_dev = list(ret)[0] def _configure_local_network(self, interface, net_info): subprocess.check_output( @@ -281,15 +282,15 @@ def _configure_local_network(self, interface, net_info): text=True, ) - def function_check_on_rpyc(self, rpyc_ip): + def function_check_with_rpyc(self, rpyc_ip): self._configure_local_network(self._net_dev, "169.254.0.1/24") rpyc_client( rpyc_ip, - "exposed_configure_local_network", + "configure_local_network", self._target_net_dev, "169.254.0.10/24", ) - ret = rpyc_client(rpyc_ip, "exposed_network_ping", "169.254.0.1", self._target_net_dev) + ret = rpyc_client(rpyc_ip, "network_ping", "169.254.0.1", self._target_net_dev) logging.debug(ret) @@ -321,35 +322,101 @@ def self_check(self): logging.error("Found more than one new interface. %s", otg_ser_intf) else: logging.info("Found new network interface '%s'", otg_ser_intf[0]) - return otg_ser_intf[0] + self._serial_iface = otg_ser_intf[0] + + def detection_check_on_rpyc(self, rpyc_ip): + ret = rpyc_client(rpyc_ip, "serial_check") + if ret: + logging.info("Found %s serial interface on rpyc server", ret) + else: + logging.debug("No serial interface found on rpyc server") + self._target_serial_dev = list(ret)[0] + + def function_check_with_rpyc(self, rpyc_ip): + logging.info("start serial server on rpyc server") + t_thread = Process( + target=rpyc_client, + args=( + rpyc_ip, + "enable_serial_server", + "/dev/serial/by-id/{}".format(self._target_serial_dev), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024 + ) + ) + t_thread.start() + logging.info("perform serial client test on DUT") + func = getattr(import_module("serial_test"), "client_mode") + try: + func( + "/dev/{}".format(self._serial_iface), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024, + ) + except SystemExit as err: + logging.debug(err) + t_thread.kill() + + # func = getattr(import_module("serial_test"), "server_mode") + # t_thread = Thread( + # target=func, + # args=( + # "/dev/{}".format(self._serial_iface), + # "USB", + # [], + # 115200, + # 8, + # "N", + # 1, + # 3, + # 1024, + # ) + # ) + # t_thread.start() + # try: + # rpyc_client(rpyc_ip, "serial_client_test", + # "/dev/serial/by-id/{}".format(self._target_serial_dev), + # "USB", + # [], + # 115200, + # 8, + # "N", + # 1, + # 3, + # 1024 + # ) + # except SystemExit as err: + # logging.debug(err) + # t_thread.join() + # t_thread.terminate() OTG_TESTING_MAPPING = { - "mass_storage": { - "detection": "", - "function": "", - "setup": OtgMassStorageSetup, - }, - "ethernet": { - "detection": "", - "function": "", - "setup": OtgEthernetSetup, - }, - "serial": { - "detection": "", - "function": "", - "setup": OtgSerialSetup, - }, + "mass_storage": OtgMassStorageSetup, + "ethernet": OtgEthernetSetup, + "serial": OtgSerialSetup, } def otg_testing(udc_node, test_func, rpyc_ip): configfs_dir = initial_configfs() - with OTG_TESTING_MAPPING[test_func]["setup"](configfs_dir, udc_node) as otg_cfg: + with OTG_TESTING_MAPPING[test_func](configfs_dir, udc_node) as otg_cfg: otg_cfg.enable_otg() otg_cfg.self_check() otg_cfg.detection_check_on_rpyc(rpyc_ip) - otg_cfg.function_check_on_rpyc(rpyc_ip) + otg_cfg.function_check_with_rpyc(rpyc_ip) otg_cfg.disable_otg() From 3a2415f4ac7aa4ab2f1587e14909d666ff6e9bfb Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Fri, 3 Jan 2025 17:20:21 +0800 Subject: [PATCH 10/17] update OTG automated test scripts update OTG automated test scripts --- .../series_uc22/snap/snapcraft.yaml | 2 + .../series_uc24/snap/snapcraft.yaml | 2 + .../bin/multiple_otg.py | 196 ++++++++++-------- .../bin/rpyc_client.py | 32 +-- .../bin/rpyc_server.py | 181 ++++++++++++++++ .../bin/serial_test.py | 131 ++++++++++-- 6 files changed, 424 insertions(+), 120 deletions(-) create mode 100644 contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml index 58a3915d0e..34c4a77209 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml @@ -100,6 +100,8 @@ parts: - python3-urwid - python3-xlsxwriter - python3-requests-oauthlib + python-packages: + - rpyc input-pcspkr: plugin: nil after: [checkbox-provider-ce-oem] diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml index c2e600e60e..076e610748 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml @@ -100,6 +100,8 @@ parts: - python3-urwid - python3-xlsxwriter - python3-requests-oauthlib + python-packages: + - rpyc input-pcspkr: plugin: nil after: [checkbox-provider-ce-oem] diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index a98ab615a7..558d117d87 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -10,11 +10,11 @@ from multiprocessing import Process from pathlib import Path from rpyc_client import rpyc_client -from threading import Thread from typing import Union OTG_MODULE = "libcomposite" + logging.basicConfig(level=logging.DEBUG) @@ -39,15 +39,6 @@ def initial_configfs() -> Union[tempfile.TemporaryDirectory, Path]: return configfs_dir -def dump_otg_info(configs): - for config in configs.split(): - otg_conf = config.split(":") - if len(otg_conf) == 2: - print("USB_CONNECTOR: {}".format(otg_conf[0])) - print("USB_BUS: {}".format(otg_conf[1])) - print() - - class OtgConfigFsOperatorBase: """ This is a object to setup the USB gadget to support different OTG scenario @@ -64,7 +55,7 @@ def __init__(self, root_path: Path, udc_path: str): self.udc_node = Path("/sys/class/udc").joinpath(udc_path) def __enter__(self): - logging.debug("enter setup function") + logging.debug("Setup the OTG configurations") if self._child_modules: logging.info(self._child_modules) # To clean up the OTG modules @@ -82,7 +73,7 @@ def __enter__(self): return self def __exit__(self, exec, value, tb): - logging.debug("enter teardown function") + logging.debug("Clean up OTG configurations") self._cleanup_usb_gadget() cur_modules = [ mod for mod in self._get_child_modules() if mod not in self._child_modules] @@ -202,9 +193,10 @@ def otg_setup(self): such as collecting ethernet interface, serial interface and create an USB storage """ + logging.info("Create an USB image file for Mass Storage Test") self._usb_img = tempfile.NamedTemporaryFile("+bw", delete=False) subprocess.run( - "dd if=/dev/zero of={} bs=1M count=16".format(self._usb_img.name), + "dd if=/dev/zero of={} bs=1M count=1024".format(self._usb_img.name), shell=True, check=True, ) @@ -213,7 +205,7 @@ def otg_setup(self): shell=True, check=True, ) - logging.info("Create an USB image file to %s", self._usb_img.name) + logging.info("%s file been created", self._usb_img.name) def otg_teardown(self): logging.info("Delete USB image file from %s", self._usb_img.name) @@ -233,8 +225,44 @@ def create_otg_function(self): self.usb_gadget_node.joinpath( "configs", "c.1", func_name).symlink_to(function_path, True) - def self_check(self): - time.sleep(10) + def detection_check_on_rpyc(self, rpyc_ip): + logging.info("USB drive detection on RPYC") + mounted_drive = rpyc_client(rpyc_ip, "usb_drive_check", "usb2") + if mounted_drive: + logging.info( + "Found USB device and mounted as '%s' on rpyc server", + mounted_drive, + ) + else: + raise RuntimeError("No USB device found on rpyc server") + self._target_dev = mounted_drive + + def function_check_with_rpyc(self, rpyc_ip): + logging.info("USB read/write testing on RPYC") + raise SystemExit(rpyc_client( + rpyc_ip, + "usb_storage_test", + "usb2", + )) + + def otg_test_process(self, rpyc_ip): + logging.info("Start Mass Storage Testing with OTG interface") + t_thread = Process( + target=self.function_check_with_rpyc, args=(rpyc_ip,) + ) + t_thread.start() + logging.debug("Launch USB detection and storage tests on RPYC server") + # Sleep few seconds to activate USB detection on RPYC server + time.sleep(3) + self.enable_otg() + t_thread.join() + self.disable_otg() + + if t_thread.exitcode == 0: + logging.info("OTG Mass Storage test passed") + else: + logging.debug("Exit code: %s", t_thread.exitcode) + raise RuntimeError("OTG Mass Storage test failed") class OtgEthernetSetup(OtgConfigFsOperatorBase): @@ -268,11 +296,12 @@ def self_check(self): self._net_dev = otg_net_intf[0] def detection_check_on_rpyc(self, rpyc_ip): - ret = rpyc_client(rpyc_ip, "net_check") + logging.info("Network interface detection on RPYC") + ret = rpyc_client(rpyc_ip, "ethernet_check") if ret: logging.info("Found %s network interface on rpyc server", ret) else: - logging.debug("No network interface found on rpyc server") + raise RuntimeError("No network interface found on rpyc server") self._target_net_dev = list(ret)[0] def _configure_local_network(self, interface, net_info): @@ -283,6 +312,7 @@ def _configure_local_network(self, interface, net_info): ) def function_check_with_rpyc(self, rpyc_ip): + logging.info("Ping DUT from RPYC") self._configure_local_network(self._net_dev, "169.254.0.1/24") rpyc_client( rpyc_ip, @@ -290,8 +320,18 @@ def function_check_with_rpyc(self, rpyc_ip): self._target_net_dev, "169.254.0.10/24", ) - ret = rpyc_client(rpyc_ip, "network_ping", "169.254.0.1", self._target_net_dev) - logging.debug(ret) + ret = rpyc_client( + rpyc_ip, "network_ping", [self._target_net_dev], "169.254.0.1" + ) + if ret != 0: + raise RuntimeError("Failed to ping DUT from RPYC server") + + def otg_test_process(self, rpyc_ip): + self.enable_otg() + self.self_check() + self.detection_check_on_rpyc(rpyc_ip) + self.function_check_with_rpyc(rpyc_ip) + self.disable_otg() class OtgSerialSetup(OtgConfigFsOperatorBase): @@ -333,74 +373,51 @@ def detection_check_on_rpyc(self, rpyc_ip): self._target_serial_dev = list(ret)[0] def function_check_with_rpyc(self, rpyc_ip): - logging.info("start serial server on rpyc server") - t_thread = Process( - target=rpyc_client, - args=( - rpyc_ip, - "enable_serial_server", - "/dev/serial/by-id/{}".format(self._target_serial_dev), - "USB", - [], - 115200, - 8, - "N", - 1, - 3, - 1024 - ) - ) - t_thread.start() logging.info("perform serial client test on DUT") func = getattr(import_module("serial_test"), "client_mode") + func( + "/dev/{}".format(self._serial_iface), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024, + ) + + def otg_test_process(self, rpyc_ip): + self.enable_otg() + self.self_check() + self.detection_check_on_rpyc(rpyc_ip) + try: - func( - "/dev/{}".format(self._serial_iface), - "USB", - [], - 115200, - 8, - "N", - 1, - 3, - 1024, + logging.info("start serial server on rpyc server") + t_thread = Process( + target=rpyc_client, + args=( + rpyc_ip, + "enable_serial_server", + "/dev/serial/by-id/{}".format(self._target_serial_dev), + "USB", + [], + 115200, + 8, + "N", + 1, + 3, + 1024 + ) ) + t_thread.start() + time.sleep(3) + self.function_check_with_rpyc(rpyc_ip) except SystemExit as err: logging.debug(err) - t_thread.kill() - - # func = getattr(import_module("serial_test"), "server_mode") - # t_thread = Thread( - # target=func, - # args=( - # "/dev/{}".format(self._serial_iface), - # "USB", - # [], - # 115200, - # 8, - # "N", - # 1, - # 3, - # 1024, - # ) - # ) - # t_thread.start() - # try: - # rpyc_client(rpyc_ip, "serial_client_test", - # "/dev/serial/by-id/{}".format(self._target_serial_dev), - # "USB", - # [], - # 115200, - # 8, - # "N", - # 1, - # 3, - # 1024 - # ) - # except SystemExit as err: - # logging.debug(err) - # t_thread.join() - # t_thread.terminate() + finally: + t_thread.kill() + self.disable_otg() OTG_TESTING_MAPPING = { @@ -413,11 +430,16 @@ def function_check_with_rpyc(self, rpyc_ip): def otg_testing(udc_node, test_func, rpyc_ip): configfs_dir = initial_configfs() with OTG_TESTING_MAPPING[test_func](configfs_dir, udc_node) as otg_cfg: - otg_cfg.enable_otg() - otg_cfg.self_check() - otg_cfg.detection_check_on_rpyc(rpyc_ip) - otg_cfg.function_check_with_rpyc(rpyc_ip) - otg_cfg.disable_otg() + otg_cfg.otg_test_process(rpyc_ip) + + +def dump_otg_info(configs): + for config in configs.split(): + otg_conf = config.split(":") + if len(otg_conf) == 2: + print("USB_CONNECTOR: {}".format(otg_conf[0])) + print("USB_BUS: {}".format(otg_conf[1])) + print() def register_arguments(): diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py index ec4f6d8a87..1c61248574 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py @@ -1,6 +1,5 @@ import time - -from importlib import import_module +import rpyc def rpyc_client(host, cmd, *args, **kwargs): @@ -16,18 +15,13 @@ def rpyc_client(host, cmd, *args, **kwargs): or the command is unknown or a service error occurs """ - try: - _rpyc = import_module("rpyc") - except ImportError: - try: - _rpyc = import_module("plainbox.vendor.rpyc") - except ImportError as exc: - msg = "RPyC not found. Neither from sys nor from Checkbox" - raise SystemExit(msg) from exc - for _ in range(2): try: - conn = _rpyc.connect(host, 60000, config={"allow_all_attrs": True}) + conn = rpyc.connect(host, 60000, config={ + "allow_all_attrs": True, + "allow_exposed_attrs": False + } + ) break except ConnectionRefusedError: time.sleep(1) @@ -35,12 +29,22 @@ def rpyc_client(host, cmd, *args, **kwargs): raise SystemExit("Cannot connect to RPYC Host.") try: - return getattr(conn.root, cmd)(*args, **kwargs) + func = getattr(conn.root, cmd) + wrap = rpyc.async_(func) + res = wrap(*args, **kwargs) + while res.ready: + print("Waiting for RPYC server complete %s".format(func)) + time.sleep(1) + break + if getattr(res._conn.root, "logs"): + print(res._conn.root.logs) + return res.value + # return getattr(conn.root, cmd)(*args, **kwargs) except AttributeError: raise SystemExit( "RPYC host does not provide a '{}' command.".format(cmd) ) - except _rpyc.core.vinegar.GenericException as exc: + except rpyc.core.vinegar.GenericException as exc: raise SystemExit( "Zapper host failed to process the requested command." ) from exc \ No newline at end of file diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py new file mode 100644 index 0000000000..2f301b31a7 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -0,0 +1,181 @@ +import io +import logging +import rpyc +import subprocess +import sys +import time + +from contextlib import redirect_stdout, redirect_stderr +from importlib import reload +from importlib.machinery import SourceFileLoader +from pathlib import Path +from rpyc.utils.server import ThreadedServer + +from checkbox_support.scripts import ( + run_watcher, + usb_read_write, +) + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +handler_std = logging.StreamHandler(sys.stdout) + + +LIBS = { + "enable_serial_server": { + "source": ( + "/snap/checkbox-ce-oem/current/providers/" + "checkbox-provider-ce-oem/bin/serial_test.py" + ), + "function": "server_mode", + }, + "network_ping": { + "source": ( + "/snap/checkbox22/current/providers/" + "checkbox-provider-base/bin/gateway_ping_test.py" + ), + "function": "perform_ping_test", + }, +} +dynamic_integrate_funcs = [LIBS[k]["function"] for k in LIBS.keys()] + + +def capture_io_logs(func): + def wrap(*args, **kwargs): + sio_stdout = io.StringIO() + sio_stderr = io.StringIO() + cls = args[0] + if func.__name__ in dynamic_integrate_funcs: + args = args[1:] + + with redirect_stderr(sio_stderr) as stderr, redirect_stdout(sio_stdout) as stdout: + try: + ret = func(*args, **kwargs) + except SystemExit as exp: + ret = exp.code + + cls.logs = "stdout logs: {}".format(stdout.getvalue()) + cls.logs += "\nstderr logs: {}".format(stderr.getvalue()) + return ret + return wrap + + +def _load_method_from_file(name, file, func): + try: + _module = SourceFileLoader(name, file).load_module() + return getattr(_module, func) + except FileNotFoundError as e: + logger.error("Failed to import module from %s file", file) + logger.debug(e) + except AttributeError as e: + logger.error("Failed to get %s function from %s file", func, file) + logger.debug(e) + + +class RpycTestService(rpyc.Service): + + def __init__(self): + super().__init__() + self.logs = "" + for key, value in LIBS.items(): + func = _load_method_from_file( + key, value["source"], value["function"] + ) + if func: + setattr(RpycTestService, key, capture_io_logs(func)) + + def on_connect(self, conn): + # code that runs when a connection is created + # (to init the service, if needed) + pass + + def on_disconnect(self, conn): + # code that runs after the connection has already closed + # (to finalize the service, if needed) + pass + + def _get_ethernet_ifaces(self): + return set([i.name for i in Path("/sys/class/net").iterdir()]) + + def _get_serial_ifaces(self): + return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) + + def _device_node_detect(self, func, device_type=""): + starting_ifaces = func() + + attempts = 20 + while attempts > 0: + now_ifaces = func() + # check if something disappeared + if not starting_ifaces == now_ifaces & starting_ifaces: + raise SystemExit( + "Interface(s) disappeared: {}".format( + ", ".join(list(starting_ifaces - now_ifaces)) + ) + ) + new_ifaces = now_ifaces - starting_ifaces + if new_ifaces: + print() + print( + "New interface(s) detected: {}".format( + ", ".join(list(new_ifaces)) + ) + ) + return new_ifaces + time.sleep(1) + print(".", end="", flush=True) + attempts -= 1 + print() + raise SystemExit( + "Failed to detect new {} interface".format(device_type) + ) + + @capture_io_logs + def serial_check(self): + return self._device_node_detect(self._get_serial_ifaces, "serial") + + @capture_io_logs + def ethernet_check(self): + return self._device_node_detect(self._get_ethernet_ifaces, "ethernet") + + @capture_io_logs + def configure_local_network(self, interface, net_info): + logger.info("set %s to %s interface on RPYC", net_info, interface) + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) + + @capture_io_logs + def usb_storage_test(self, usb_type): + logger.info("%s drive read/write testing on RPYC", usb_type) + usb_read_write.REPETITION_NUM = 2 + watcher = run_watcher.USBStorage(usb_type) + mounted_partition = watcher.run_insertion() + logger.info( + "%s drive been mounted to %s on RPYC", usb_type, mounted_partition + ) + watcher.run_storage(mounted_partition) + # usb_read_write been imported in run_watcher + # the temporary mount point been created while import it and been removed in gen_random_file function + # thus, we need to reload it to create temporary mount point in every testing cycle + reload(usb_read_write) + + +def main(): + t = ThreadedServer( + RpycTestService, + port=60000, + logger=logger, + protocol_config={ + "allow_all_attrs": True, + "allow_exposed_attrs": False, + }, + ) + t.start() + + +if __name__ == "__main__": + main() diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py index 42bda9e548..2ea85f2558 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py @@ -130,7 +130,18 @@ def generate_random_string(length): return "".join(random.choice(letters) for _ in range(length)) -def server_mode(ser: Serial) -> None: +def server_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +) -> None: """ Running as a server, it will be sniffing for received string. And it will send the same string out. @@ -138,6 +149,18 @@ def server_mode(ser: Serial) -> None: running on port /dev/ttyUSB0 as a server $ sudo ./serial_test.py /dev/ttyUSB0 --mode server --type USB """ + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) logging.info("Listening on port {} ...".format(ser.node)) while True: data = ser.recv() @@ -148,7 +171,18 @@ def server_mode(ser: Serial) -> None: logging.info("Listening on port {} ...".format(ser.node)) -def client_mode(ser: Serial, data_size: int = 1024): +def client_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +): """ Running as a clinet and it will sending out a string and wait the string send back from server. After receive the string, @@ -157,7 +191,19 @@ def client_mode(ser: Serial, data_size: int = 1024): running on port /dev/ttymxc1 as a client $ sudo ./serial_test.py /dev/ttymxc1 --mode client --type RS485 """ - random_string = generate_random_string(data_size) + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) + random_string = generate_random_string(datasize) ser.send(random_string.encode()) for i in range(1, 6): logging.info("Attempting receive string... {} time".format(i)) @@ -174,13 +220,36 @@ def client_mode(ser: Serial, data_size: int = 1024): raise SystemExit(1) -def console_mode(ser: Serial): +def console_mode( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, +): """ Test the serial port when it is in console mode This test requires DUT to loop back it self. For example: connect the serial console port to the USB port via serial to usb dongle """ + ser = Serial( + node, + type, + group, + baudrate, + bytesize, + parity, + stopbits, + timeout, + datasize, + rs485_settings, + ) try: # Send 'Enter Key' logging.info("Sending 'Enter Key'...") @@ -329,25 +398,49 @@ def main(): "delay_before_tx": args.rts_delay_before_tx, "delay_before_rx": args.rts_delay_before_rx, } - ser = Serial( - args.node, - args.type, - args.group, - baudrate=args.baudrate, - bytesize=args.bytesize, - parity=args.parity, - stopbits=args.stopbits, - timeout=args.timeout, - data_size=args.datasize, - rs485_settings=rs485_settings, - ) + else: + rs485_settings = None + if args.mode == "server": - server_mode(ser) + server_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) elif args.mode == "client": - client_mode(ser, data_size=args.datasize) + client_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) elif args.mode == "console": - console_mode(ser) + console_mode( + args.node, + args.type, + args.group, + args.baudrate, + args.bytesize, + args.parity, + args.stopbits, + args.timeout, + args.datasize, + rs485_settings, + ) else: raise SystemExit(1) From 845974f7470d13998bcf43b272a85564ee2f61fb Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Mon, 6 Jan 2025 18:14:11 +0800 Subject: [PATCH 11/17] refactor rpyc server refactor rpyc server --- .../bin/rpyc_server.py | 187 ++++++++++-------- 1 file changed, 102 insertions(+), 85 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py index 2f301b31a7..38b476442d 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -37,6 +37,22 @@ ), "function": "perform_ping_test", }, + "serial_check": { + "source": __file__, + "function": "serial_check", + }, + "ethernet_check": { + "source": __file__, + "function": "ethernet_check", + }, + "configure_local_network": { + "source": __file__, + "function": "configure_local_network", + }, + "usb_storage_test": { + "source": __file__, + "function": "usb_storage_test", + }, } dynamic_integrate_funcs = [LIBS[k]["function"] for k in LIBS.keys()] @@ -73,100 +89,101 @@ def _load_method_from_file(name, file, func): logger.debug(e) -class RpycTestService(rpyc.Service): +def append_method_to_service(cls): + for key, value in LIBS.items(): + func = _load_method_from_file( + key, value["source"], value["function"] + ) + if func: + setattr(cls, key, capture_io_logs(func)) - def __init__(self): - super().__init__() - self.logs = "" - for key, value in LIBS.items(): - func = _load_method_from_file( - key, value["source"], value["function"] - ) - if func: - setattr(RpycTestService, key, capture_io_logs(func)) - - def on_connect(self, conn): - # code that runs when a connection is created - # (to init the service, if needed) - pass - - def on_disconnect(self, conn): - # code that runs after the connection has already closed - # (to finalize the service, if needed) - pass - - def _get_ethernet_ifaces(self): - return set([i.name for i in Path("/sys/class/net").iterdir()]) - - def _get_serial_ifaces(self): - return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) - - def _device_node_detect(self, func, device_type=""): - starting_ifaces = func() - - attempts = 20 - while attempts > 0: - now_ifaces = func() - # check if something disappeared - if not starting_ifaces == now_ifaces & starting_ifaces: - raise SystemExit( - "Interface(s) disappeared: {}".format( - ", ".join(list(starting_ifaces - now_ifaces)) - ) + return cls + + +# Method for testing +def _get_ethernet_ifaces(): + return set([i.name for i in Path("/sys/class/net").iterdir()]) + + +def _get_serial_ifaces(): + return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) + + +def _device_node_detect(func, device_type=""): + starting_ifaces = func() + + attempts = 20 + while attempts > 0: + now_ifaces = func() + # check if something disappeared + if not starting_ifaces == now_ifaces & starting_ifaces: + raise SystemExit( + "Interface(s) disappeared: {}".format( + ", ".join(list(starting_ifaces - now_ifaces)) ) - new_ifaces = now_ifaces - starting_ifaces - if new_ifaces: - print() - print( - "New interface(s) detected: {}".format( - ", ".join(list(new_ifaces)) - ) + ) + new_ifaces = now_ifaces - starting_ifaces + if new_ifaces: + print() + print( + "New interface(s) detected: {}".format( + ", ".join(list(new_ifaces)) ) - return new_ifaces - time.sleep(1) - print(".", end="", flush=True) - attempts -= 1 - print() - raise SystemExit( - "Failed to detect new {} interface".format(device_type) - ) + ) + return new_ifaces + time.sleep(1) + print(".", end="", flush=True) + attempts -= 1 + print() + raise SystemExit( + "Failed to detect new {} interface".format(device_type) + ) - @capture_io_logs - def serial_check(self): - return self._device_node_detect(self._get_serial_ifaces, "serial") - - @capture_io_logs - def ethernet_check(self): - return self._device_node_detect(self._get_ethernet_ifaces, "ethernet") - - @capture_io_logs - def configure_local_network(self, interface, net_info): - logger.info("set %s to %s interface on RPYC", net_info, interface) - subprocess.check_output( - "ip addr add {} dev {}".format(net_info, interface), - shell=True, - text=True, - ) - @capture_io_logs - def usb_storage_test(self, usb_type): - logger.info("%s drive read/write testing on RPYC", usb_type) - usb_read_write.REPETITION_NUM = 2 - watcher = run_watcher.USBStorage(usb_type) - mounted_partition = watcher.run_insertion() - logger.info( - "%s drive been mounted to %s on RPYC", usb_type, mounted_partition - ) - watcher.run_storage(mounted_partition) - # usb_read_write been imported in run_watcher - # the temporary mount point been created while import it and been removed in gen_random_file function - # thus, we need to reload it to create temporary mount point in every testing cycle - reload(usb_read_write) +def serial_check(): + return _device_node_detect(_get_serial_ifaces, "serial") + + +def ethernet_check(): + return _device_node_detect(_get_ethernet_ifaces, "ethernet") + + +def configure_local_network(interface, net_info): + logger.info("set %s to %s interface on RPYC", net_info, interface) + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) + + +def usb_storage_test(usb_type): + logger.info("%s drive read/write testing on RPYC", usb_type) + usb_read_write.REPETITION_NUM = 2 + watcher = run_watcher.USBStorage(usb_type) + mounted_partition = watcher.run_insertion() + logger.info( + "%s drive been mounted to %s on RPYC", usb_type, mounted_partition + ) + watcher.run_storage(mounted_partition) + # usb_read_write been imported in run_watcher + # the temporary mount point been created while import it and been removed in gen_random_file function + # thus, we need to reload it to create temporary mount point in every testing cycle + reload(usb_read_write) + + +class RpycTestService(rpyc.Service): + + def __init__(self): + super().__init__() + self.logs = "" def main(): + rpyc_service = append_method_to_service(RpycTestService()) + t = ThreadedServer( - RpycTestService, + rpyc_service, port=60000, logger=logger, protocol_config={ From cf0c89269b6261e1239edaf8354aeab618908a89 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Wed, 8 Jan 2025 15:09:30 +0800 Subject: [PATCH 12/17] fix rpyc server fix rpyc server --- .../checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py index 38b476442d..d30f29c388 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -180,7 +180,7 @@ def __init__(self): def main(): - rpyc_service = append_method_to_service(RpycTestService()) + rpyc_service = append_method_to_service(RpycTestService) t = ThreadedServer( rpyc_service, From 3798384d52382b71e875d4e80d76bbecc79b6da7 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Wed, 8 Jan 2025 18:02:50 +0800 Subject: [PATCH 13/17] add otg test plan and fixed issues add otg test plan and fixed issues --- .../bin/multiple_otg.py | 72 +++++++--- .../bin/rpyc_server.py | 19 +-- .../units/otg/jobs.pxu | 131 +++++++++++++++--- .../units/otg/test-plan.pxu | 5 + 4 files changed, 176 insertions(+), 51 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 558d117d87..4af504f671 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -7,12 +7,17 @@ import time from importlib import import_module +from importlib.machinery import SourceFileLoader from multiprocessing import Process from pathlib import Path from rpyc_client import rpyc_client from typing import Union OTG_MODULE = "libcomposite" +CHECKBOX_RUNTIME = os.environ.get("CHECKBOX_RUNTIME", "") +CHECKBOX_BASE_PROVIDER = os.path.join( + CHECKBOX_RUNTIME, "providers/checkbox-provider-base" +) logging.basicConfig(level=logging.DEBUG) @@ -48,11 +53,12 @@ class OtgConfigFsOperatorBase: OTG_FUNCTION = "" OTG_TARGET_MODULE = "" - def __init__(self, root_path: Path, udc_path: str): + def __init__(self, root_path: Path, udc_path: str, usb_type: str): self._child_modules = self._get_child_modules() self.root_path = root_path self.usb_gadget_node = None self.udc_node = Path("/sys/class/udc").joinpath(udc_path) + self.usb_type = usb_type def __enter__(self): logging.debug("Setup the OTG configurations") @@ -227,7 +233,7 @@ def create_otg_function(self): def detection_check_on_rpyc(self, rpyc_ip): logging.info("USB drive detection on RPYC") - mounted_drive = rpyc_client(rpyc_ip, "usb_drive_check", "usb2") + mounted_drive = rpyc_client(rpyc_ip, "usb_drive_check", self.usb_type) if mounted_drive: logging.info( "Found USB device and mounted as '%s' on rpyc server", @@ -242,7 +248,7 @@ def function_check_with_rpyc(self, rpyc_ip): raise SystemExit(rpyc_client( rpyc_ip, "usb_storage_test", - "usb2", + self.usb_type, )) def otg_test_process(self, rpyc_ip): @@ -265,6 +271,27 @@ def otg_test_process(self, rpyc_ip): raise RuntimeError("OTG Mass Storage test failed") +def configure_local_network(interface, net_info): + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} down".format(interface), + shell=True, + text=True, + ) + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) + logging.info("Turn up the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} up".format(interface), + shell=True, + text=True, + ) + + class OtgEthernetSetup(OtgConfigFsOperatorBase): OTG_FUNCTION = "ecm" @@ -304,25 +331,22 @@ def detection_check_on_rpyc(self, rpyc_ip): raise RuntimeError("No network interface found on rpyc server") self._target_net_dev = list(ret)[0] - def _configure_local_network(self, interface, net_info): - subprocess.check_output( - "ip addr add {} dev {}".format(net_info, interface), - shell=True, - text=True, - ) - def function_check_with_rpyc(self, rpyc_ip): - logging.info("Ping DUT from RPYC") - self._configure_local_network(self._net_dev, "169.254.0.1/24") + configure_local_network(self._net_dev, "169.245.0.1/24") + logging.info("Configure the %s network on RPYC", self._target_net_dev) rpyc_client( rpyc_ip, "configure_local_network", self._target_net_dev, "169.254.0.10/24", ) - ret = rpyc_client( - rpyc_ip, "network_ping", [self._target_net_dev], "169.254.0.1" - ) + logging.info("Ping from DUT to Target") + _module = SourceFileLoader( + "_", + os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py") + ).load_module() + test_func = getattr(_module, "perform_ping_test") + ret = test_func([self._net_dev], "169.254.0.10") if ret != 0: raise RuntimeError("Failed to ping DUT from RPYC server") @@ -427,18 +451,21 @@ def otg_test_process(self, rpyc_ip): } -def otg_testing(udc_node, test_func, rpyc_ip): +def otg_testing(udc_node, test_func, rpyc_ip, usb_type): configfs_dir = initial_configfs() - with OTG_TESTING_MAPPING[test_func](configfs_dir, udc_node) as otg_cfg: + with OTG_TESTING_MAPPING[test_func]( + configfs_dir, udc_node, usb_type + ) as otg_cfg: otg_cfg.otg_test_process(rpyc_ip) def dump_otg_info(configs): for config in configs.split(): otg_conf = config.split(":") - if len(otg_conf) == 2: + if len(otg_conf) == 3: print("USB_CONNECTOR: {}".format(otg_conf[0])) - print("USB_BUS: {}".format(otg_conf[1])) + print("UDC_NODE: {}".format(otg_conf[1])) + print("USB_TYPE: {}".format(otg_conf[2])) print() @@ -457,6 +484,9 @@ def register_arguments(): choices=["mass_storage", "ethernet", "serial"], ) test_parser.add_argument("-u", "--udc-node", required=True, type=str) + test_parser.add_argument( + "--usb-type", default="usb2", type=str, choices=["usb2", "usb3"] + ) test_parser.add_argument("--rpyc-address", required=True, type=str) info_parser = sub_parser.add_parser("info") @@ -468,7 +498,9 @@ def register_arguments(): def main(): args = register_arguments() if args.mode == "test": - otg_testing(args.udc_node, args.type, args.rpyc_address) + otg_testing( + args.udc_node, args.type, args.rpyc_address, args.usb_type + ) elif args.mode == "info": dump_otg_info(args.config) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py index d30f29c388..cbf532a8ec 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -1,7 +1,7 @@ import io import logging +import os import rpyc -import subprocess import sys import time @@ -30,12 +30,12 @@ ), "function": "server_mode", }, - "network_ping": { + "configure_local_network": { "source": ( - "/snap/checkbox22/current/providers/" - "checkbox-provider-base/bin/gateway_ping_test.py" + "/snap/checkbox-ce-oem/current/providers/" + "checkbox-provider-ce-oem/bin/multiple_otg.py" ), - "function": "perform_ping_test", + "function": "configure_local_network", }, "serial_check": { "source": __file__, @@ -148,15 +148,6 @@ def ethernet_check(): return _device_node_detect(_get_ethernet_ifaces, "ethernet") -def configure_local_network(interface, net_info): - logger.info("set %s to %s interface on RPYC", net_info, interface) - subprocess.check_output( - "ip addr add {} dev {}".format(net_info, interface), - shell=True, - text=True, - ) - - def usb_storage_test(usb_type): logger.info("%s drive read/write testing on RPYC", usb_type) usb_read_write.REPETITION_NUM = 2 diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu index 8aababc448..bceadc63ea 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu @@ -24,15 +24,15 @@ id: ce-oem-otg/g_serial-{USB_port} estimated_duration: 60 plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend _summary: Check {USB_port} can be detected as a serial device -_purpose: +_purpose: Check that after connecting the device under test (DUT) to another device (host), {USB_port} can be detected as a serial device by the host. _steps: @@ -50,11 +50,11 @@ _steps: 7. Check the string sending from the host to {USB_port} by repeating steps 4-6, but swap the send and receive sides. _verification: Does string send and receive work? -command: +command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f acm @@ -62,14 +62,14 @@ unit: template template-resource: otg_ports template-unit: job category_id: com.canonical.plainbox::usb -id: ce-oem-otg/g_mass_storage-{USB_port} +id: ce-oem-otg/g_mass_storage-{USB_port} estimated_duration: 60 plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend @@ -79,16 +79,16 @@ _purpose: (host), {USB_port} can be detected as a mass storage device by the host. _steps: 1. Press Enter to setup a 16 MB FAT32 image on the device. - 2. From an Ubuntu host, connect a cable to the USB OTG port + 2. From an Ubuntu host, connect a cable to the USB OTG port (it's the USB connector) of the DUT. _verification: - The host detects and mounts a mass storage device. It has read and write + The host detects and mounts a mass storage device. It has read and write permissions on it. command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f mass_storage @@ -99,23 +99,23 @@ category_id: com.canonical.plainbox::usb id: ce-oem-otg/g_ether-{USB_port} plugin: user-interact-verify user: root -imports: +imports: from com.canonical.plainbox import manifest from com.canonical.certification import cpuinfo -requires: +requires: manifest.has_otg == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend _summary: Check {USB_port} can be detected as USB ethernet device. -_purpose: +_purpose: Check that after connecting the device under test (DUT) to another device (host), {USB_port} can be detected as a USB ethernet device by the host. _steps: - 1. From an Ubuntu host, connect a cable to the USB OTG port + 1. From an Ubuntu host, connect a cable to the USB OTG port (it's the USB connector) of the {USB_port}. 2. Press Enter to config the IP address for the USB OTG ethernet interface on the DUT. (IP will be 192.168.9.1/24) - 3. On the host, config the IP address for the USB OTG ethernet interface. And it should be + 3. On the host, config the IP address for the USB OTG ethernet interface. And it should be the same subnet with {USB_port}. (eg. 192.168.9.2/24) 4. Try to ping between the host and {USB_port}. _verification: @@ -124,6 +124,103 @@ command: # shellcheck disable=SC2050 if [ {Mode} != "otg" ]; then echo -e "Error: USB mode is {Mode} mode, but expected in otg mode." - exit 1 + exit 1 fi multiple-otg.sh -u {UDC} -f ecm + + +id: otg_port_mapping +plugin: resource +_summary: Gather list of USB ports and UDC. +_description: + A USB port and UDC mapping resource that relies on the user specifying in config varirable. + Usage of parameter: OTG={port1}:{udc_node1}:{usb_type1} {port2}:{udc_node2}:{usb_type2} ... + e.g. OTG=USB-C1:11200000.usb:usb2 USB-Micro:112a1000.usb:usb3 +estimated_duration: 1s +environ: OTG +flags: preserve-locale +user: root +command: + if [ "$OTG" ]; then + multiple_otg.py info -c "$OTG" + else + echo "OTG config variable: not found" + fi + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-network-test +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Ethernet-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +_summary: Check OTG {{ USB_CONNETOR }} can be detected as a network device and can ping from DUT to Server +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then setup the USB network interface on both DUT and RPYC server and perform ping test on DUT +command: + multiple_otg.py test -t ethernet -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-serial-test +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Serial-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +_summary: Check OTG {{ USB_CONNETOR }} can be detected as a serial device and transfer data via OTG Serial +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then start serial server on RPYC server and perform data read/write test via serial interface +command: + multiple_otg.py test -t serial -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} + +unit: template +template-resource: otg_port_mapping +template-unit: job +template-engine: jinja2 +template-id: ce-oem-otg/otg-mass-storage-test +category_id: com.canonical.plainbox::usb +id: ce-oem-otg/OTG-Mass-Storage-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +estimated_duration: 60 +plugin: shell +user: root +environ: + RYPC_SERVER +imports: + from com.canonical.plainbox import manifest + from com.canonical.certification import cpuinfo +requires: + manifest.has_otg == 'True' + cpuinfo.platform in ("aarch64", "armv7l") +flags: also-after-suspend +_summary: Check OTG {{ USB_CONNETOR }} can be detected as a mass storage device and be able to access +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + the USB interface will be detected as a mass storage and perform data read/write test +command: + multiple_otg.py test -t mass_storage -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu index f107fa9cca..417c7d6095 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/test-plan.pxu @@ -22,7 +22,12 @@ id: ce-oem-otg-automated unit: test plan _name: OTG auto tests _description: Automated OTG tests for devices +bootstrap_include: + otg_port_mapping include: + ce-oem-otg/otg-network-test + ce-oem-otg/otg-serial-test + ce-oem-otg/otg-mass-storage-test id: after-suspend-ce-oem-otg-manual unit: test plan From 88c2fcc834a95d67181d9a1d4ba4534f57a3ae47 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Thu, 9 Jan 2025 17:52:01 +0800 Subject: [PATCH 14/17] fixed job and scripts issue fixed job and scripts issue --- .../series_classic22/snap/snapcraft.yaml | 1 + .../series_classic24/snap/snapcraft.yaml | 1 + .../series_uc22/snap/snapcraft.yaml | 3 +- .../series_uc24/snap/snapcraft.yaml | 3 +- .../bin/multiple_otg.py | 30 ++--- .../bin/rpyc_server.py | 105 ++++-------------- .../bin/rpyc_test_methods.py | 94 ++++++++++++++++ .../units/otg/jobs.pxu | 73 ++++++++---- .../units/otg/manifest.pxu | 7 +- 9 files changed, 179 insertions(+), 138 deletions(-) mode change 100644 => 100755 contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py create mode 100755 contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml index 32d88b8d46..91e83e557f 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic22/snap/snapcraft.yaml @@ -45,6 +45,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | snapcraftctl prime rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml index e246a6c2e8..b94f703034 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_classic24/snap/snapcraft.yaml @@ -45,6 +45,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | craftctl default rm lib/systemd/system/alsa-utils.service diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml index 34c4a77209..2338bf1de4 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc22/snap/snapcraft.yaml @@ -82,6 +82,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | snapcraftctl prime rm lib/systemd/system/alsa-utils.service @@ -100,8 +101,6 @@ parts: - python3-urwid - python3-xlsxwriter - python3-requests-oauthlib - python-packages: - - rpyc input-pcspkr: plugin: nil after: [checkbox-provider-ce-oem] diff --git a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml index 076e610748..894335b0ff 100644 --- a/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml +++ b/contrib/checkbox-ce-oem/checkbox-ce-oem-snap/series_uc24/snap/snapcraft.yaml @@ -82,6 +82,7 @@ parts: - device-tree-compiler - linuxptp - snmp + - python3-rpyc override-prime: | craftctl default rm lib/systemd/system/alsa-utils.service @@ -100,8 +101,6 @@ parts: - python3-urwid - python3-xlsxwriter - python3-requests-oauthlib - python-packages: - - rpyc input-pcspkr: plugin: nil after: [checkbox-provider-ce-oem] diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 4af504f671..888ee251e0 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import argparse import glob import logging @@ -11,6 +13,7 @@ from multiprocessing import Process from pathlib import Path from rpyc_client import rpyc_client +from rpyc_test_methods import configure_local_network from typing import Union OTG_MODULE = "libcomposite" @@ -61,7 +64,9 @@ def __init__(self, root_path: Path, udc_path: str, usb_type: str): self.usb_type = usb_type def __enter__(self): - logging.debug("Setup the OTG configurations") + logging.info( + "Setup the OTG configurations on %s UDC node", self.udc_node + ) if self._child_modules: logging.info(self._child_modules) # To clean up the OTG modules @@ -271,27 +276,6 @@ def otg_test_process(self, rpyc_ip): raise RuntimeError("OTG Mass Storage test failed") -def configure_local_network(interface, net_info): - logging.info("Turn down the link of %s interface", interface) - subprocess.check_output( - "ip link set dev {} down".format(interface), - shell=True, - text=True, - ) - logging.info("Turn down the link of %s interface", interface) - subprocess.check_output( - "ip addr add {} dev {}".format(net_info, interface), - shell=True, - text=True, - ) - logging.info("Turn up the link of %s interface", interface) - subprocess.check_output( - "ip link set dev {} up".format(interface), - shell=True, - text=True, - ) - - class OtgEthernetSetup(OtgConfigFsOperatorBase): OTG_FUNCTION = "ecm" @@ -332,7 +316,7 @@ def detection_check_on_rpyc(self, rpyc_ip): self._target_net_dev = list(ret)[0] def function_check_with_rpyc(self, rpyc_ip): - configure_local_network(self._net_dev, "169.245.0.1/24") + configure_local_network(self._net_dev, "169.254.0.1/24") logging.info("Configure the %s network on RPYC", self._target_net_dev) rpyc_client( rpyc_ip, diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py old mode 100644 new mode 100755 index cbf532a8ec..271f876c87 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -3,54 +3,49 @@ import os import rpyc import sys -import time from contextlib import redirect_stdout, redirect_stderr -from importlib import reload from importlib.machinery import SourceFileLoader -from pathlib import Path from rpyc.utils.server import ThreadedServer -from checkbox_support.scripts import ( - run_watcher, - usb_read_write, -) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) handler_std = logging.StreamHandler(sys.stdout) - +CHECKBOX_PROVIDER_CEOEM_PATH = ( + "/snap/checkbox-ce-oem/current/providers/checkbox-provider-ce-oem/bin" +) LIBS = { "enable_serial_server": { - "source": ( - "/snap/checkbox-ce-oem/current/providers/" - "checkbox-provider-ce-oem/bin/serial_test.py" + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py" ), "function": "server_mode", }, - "configure_local_network": { - "source": ( - "/snap/checkbox-ce-oem/current/providers/" - "checkbox-provider-ce-oem/bin/multiple_otg.py" - ), - "function": "configure_local_network", - }, "serial_check": { - "source": __file__, + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), "function": "serial_check", }, "ethernet_check": { - "source": __file__, + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), "function": "ethernet_check", }, "configure_local_network": { - "source": __file__, + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), "function": "configure_local_network", }, "usb_storage_test": { - "source": __file__, + "source": os.path.join( + CHECKBOX_PROVIDER_CEOEM_PATH, "rpyc_test_methods.py" + ), "function": "usb_storage_test", }, } @@ -69,7 +64,8 @@ def wrap(*args, **kwargs): try: ret = func(*args, **kwargs) except SystemExit as exp: - ret = exp.code + logging.error(exp.code) + ret = None cls.logs = "stdout logs: {}".format(stdout.getvalue()) cls.logs += "\nstderr logs: {}".format(stderr.getvalue()) @@ -100,69 +96,6 @@ def append_method_to_service(cls): return cls -# Method for testing -def _get_ethernet_ifaces(): - return set([i.name for i in Path("/sys/class/net").iterdir()]) - - -def _get_serial_ifaces(): - return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) - - -def _device_node_detect(func, device_type=""): - starting_ifaces = func() - - attempts = 20 - while attempts > 0: - now_ifaces = func() - # check if something disappeared - if not starting_ifaces == now_ifaces & starting_ifaces: - raise SystemExit( - "Interface(s) disappeared: {}".format( - ", ".join(list(starting_ifaces - now_ifaces)) - ) - ) - new_ifaces = now_ifaces - starting_ifaces - if new_ifaces: - print() - print( - "New interface(s) detected: {}".format( - ", ".join(list(new_ifaces)) - ) - ) - return new_ifaces - time.sleep(1) - print(".", end="", flush=True) - attempts -= 1 - print() - raise SystemExit( - "Failed to detect new {} interface".format(device_type) - ) - - -def serial_check(): - return _device_node_detect(_get_serial_ifaces, "serial") - - -def ethernet_check(): - return _device_node_detect(_get_ethernet_ifaces, "ethernet") - - -def usb_storage_test(usb_type): - logger.info("%s drive read/write testing on RPYC", usb_type) - usb_read_write.REPETITION_NUM = 2 - watcher = run_watcher.USBStorage(usb_type) - mounted_partition = watcher.run_insertion() - logger.info( - "%s drive been mounted to %s on RPYC", usb_type, mounted_partition - ) - watcher.run_storage(mounted_partition) - # usb_read_write been imported in run_watcher - # the temporary mount point been created while import it and been removed in gen_random_file function - # thus, we need to reload it to create temporary mount point in every testing cycle - reload(usb_read_write) - - class RpycTestService(rpyc.Service): def __init__(self): diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py new file mode 100755 index 0000000000..44d752690d --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py @@ -0,0 +1,94 @@ +import logging +import subprocess +import time + +from importlib import reload +from pathlib import Path +from checkbox_support.scripts import ( + run_watcher, + usb_read_write, +) + + +# Method for testing +def _get_ethernet_ifaces(): + return set([i.name for i in Path("/sys/class/net").iterdir()]) + + +def _get_serial_ifaces(): + return set([i.name for i in Path("/dev/serial/by-id").iterdir()]) + + +def _device_node_detect(func, device_type=""): + starting_ifaces = func() + + attempts = 20 + while attempts > 0: + now_ifaces = func() + # check if something disappeared + if not starting_ifaces == now_ifaces & starting_ifaces: + raise SystemExit( + "Interface(s) disappeared: {}".format( + ", ".join(list(starting_ifaces - now_ifaces)) + ) + ) + new_ifaces = now_ifaces - starting_ifaces + if new_ifaces: + print() + print( + "New interface(s) detected: {}".format( + ", ".join(list(new_ifaces)) + ) + ) + return new_ifaces + time.sleep(1) + print(".", end="", flush=True) + attempts -= 1 + print() + raise SystemExit( + "Failed to detect new {} interface".format(device_type) + ) + + +def configure_local_network(interface, net_info): + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} down".format(interface), + shell=True, + text=True, + ) + logging.info("Turn down the link of %s interface", interface) + subprocess.check_output( + "ip addr add {} dev {}".format(net_info, interface), + shell=True, + text=True, + ) + logging.info("Turn up the link of %s interface", interface) + subprocess.check_output( + "ip link set dev {} up".format(interface), + shell=True, + text=True, + ) + + +def serial_check(): + return _device_node_detect(_get_serial_ifaces, "serial") + + +def ethernet_check(): + return _device_node_detect(_get_ethernet_ifaces, "ethernet") + + +def usb_storage_test(usb_type): + logging.info("%s drive read/write testing on RPYC", usb_type) + usb_read_write.REPETITION_NUM = 2 + watcher = run_watcher.USBStorage(usb_type) + mounted_partition = watcher.run_insertion() + logging.info( + "%s drive been mounted to %s on RPYC", usb_type, mounted_partition + ) + watcher.run_storage(mounted_partition) + # usb_read_write been imported in run_watcher + # the temporary mount point been created while import it and been removed in gen_random_file function + # thus, we need to reload it to create temporary mount point in every testing cycle + reload(usb_read_write) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu index bceadc63ea..ae69d49229 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/jobs.pxu @@ -130,21 +130,22 @@ command: id: otg_port_mapping +category_id: com.canonical.plainbox::usb plugin: resource _summary: Gather list of USB ports and UDC. _description: A USB port and UDC mapping resource that relies on the user specifying in config varirable. - Usage of parameter: OTG={port1}:{udc_node1}:{usb_type1} {port2}:{udc_node2}:{usb_type2} ... - e.g. OTG=USB-C1:11200000.usb:usb2 USB-Micro:112a1000.usb:usb3 + Usage of parameter: USB_OTG_MAPPING={port1}:{udc_node1}:{usb_type1} {port2}:{udc_node2}:{usb_type2} ... + e.g. USB_OTG_MAPPING=USB-C1:11200000.usb:usb2 USB-Micro:112a1000.usb:usb3 estimated_duration: 1s -environ: OTG +environ: USB_OTG_MAPPING flags: preserve-locale user: root command: - if [ "$OTG" ]; then - multiple_otg.py info -c "$OTG" + if [ "$USB_OTG_MAPPING" ]; then + multiple_otg.py info -c "$USB_OTG_MAPPING" else - echo "OTG config variable: not found" + echo "USB_OTG_MAPPING config variable: not found" fi unit: template @@ -152,8 +153,15 @@ template-resource: otg_port_mapping template-unit: job template-engine: jinja2 template-id: ce-oem-otg/otg-network-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a network device through OTG {{ USB_CONNETOR }} and can ping to RPYC Server +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then setup the USB network interface on both DUT and RPYC server and perform ping test on DUT category_id: com.canonical.plainbox::usb -id: ce-oem-otg/OTG-Ethernet-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +id: ce-oem-otg/OTG-Ethernet-on-port-{{ USB_CONNECTOR }} estimated_duration: 60 plugin: shell user: root @@ -164,22 +172,30 @@ imports: from com.canonical.certification import cpuinfo requires: manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend -_summary: Check OTG {{ USB_CONNETOR }} can be detected as a network device and can ping from DUT to Server -_description: - The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) - Then setup the USB network interface on both DUT and RPYC server and perform ping test on DUT command: - multiple_otg.py test -t ethernet -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t ethernet -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} unit: template template-resource: otg_port_mapping template-unit: job template-engine: jinja2 template-id: ce-oem-otg/otg-serial-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a serial device through OTG {{ USB_CONNETOR }} and transfer data via OTG Serial +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + Then start serial server on RPYC server and perform data read/write test via serial interface category_id: com.canonical.plainbox::usb -id: ce-oem-otg/OTG-Serial-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +id: ce-oem-otg/OTG-Serial-on-port-{{ USB_CONNECTOR }} estimated_duration: 60 plugin: shell user: root @@ -190,22 +206,30 @@ imports: from com.canonical.certification import cpuinfo requires: manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend -_summary: Check OTG {{ USB_CONNETOR }} can be detected as a serial device and transfer data via OTG Serial -_description: - The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) - Then start serial server on RPYC server and perform data read/write test via serial interface command: - multiple_otg.py test -t serial -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t serial -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} unit: template template-resource: otg_port_mapping template-unit: job template-engine: jinja2 template-id: ce-oem-otg/otg-mass-storage-test +_template-summary: + OTG USB drive read/write test +_purpose: + Validate DUT can be detected as a mass storage device through OTG {{ USB_CONNETOR }} and be able to access +_description: + The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) + the USB interface will be detected as a mass storage and perform data read/write test category_id: com.canonical.plainbox::usb -id: ce-oem-otg/OTG-Mass-Storage-{{ USB_CONNECTOR }}-{{ USB_TYPE }} +id: ce-oem-otg/OTG-Mass-Storage-on-port-{{ USB_CONNECTOR }} estimated_duration: 60 plugin: shell user: root @@ -216,11 +240,12 @@ imports: from com.canonical.certification import cpuinfo requires: manifest.has_otg == 'True' + manifest.has_rpyc_otg_server == 'True' cpuinfo.platform in ("aarch64", "armv7l") flags: also-after-suspend -_summary: Check OTG {{ USB_CONNETOR }} can be detected as a mass storage device and be able to access -_description: - The test will setup the OTG configuration within {{ UDC_NODE }} on the device under test (DUT) - the USB interface will be detected as a mass storage and perform data read/write test command: - multiple_otg.py test -t mass_storage -u {{ UDC_NODE }} --rpyc-address {{ RPYC_SERVER }} --usb-type {{ USB_TYPE }} + if [ -z "$RPYC_SERVER" ]; then + echo "Error: RPYC_SERVER is not defined" + exit 1 + fi + multiple_otg.py test -t mass_storage -u {{ UDC_NODE }} --rpyc-address "$RPYC_SERVER" --usb-type {{ USB_TYPE }} diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu index 57ee1f2c65..75e651a687 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/otg/manifest.pxu @@ -1,4 +1,9 @@ unit: manifest entry id: has_otg _name: Does platfrom supported USB OTG? -value-type: bool \ No newline at end of file +value-type: bool + +unit: manifest entry +id: has_rpyc_otg_server +_name: Does RPYC server with OTG test services been setup? +value-type: bool From 5a7259cb21034aff1173552a8d31fa7cb319ec16 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Fri, 10 Jan 2025 09:29:11 +0800 Subject: [PATCH 15/17] fix black8 issue fix black8 issue --- .../bin/multiple_otg.py | 64 ++++++++++++------- .../bin/rpyc_client.py | 10 +-- .../bin/rpyc_server.py | 14 ++-- .../bin/rpyc_test_methods.py | 4 +- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py index 888ee251e0..e564c4c318 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/multiple_otg.py @@ -74,8 +74,7 @@ def __enter__(self): self.enable_otg_module([OTG_MODULE]) self.usb_gadget_node = Path( tempfile.TemporaryDirectory( - dir=self.root_path.joinpath("usb_gadget"), - prefix="udc_" + dir=self.root_path.joinpath("usb_gadget"), prefix="udc_" ).name ) self.otg_setup() @@ -87,7 +86,10 @@ def __exit__(self, exec, value, tb): logging.debug("Clean up OTG configurations") self._cleanup_usb_gadget() cur_modules = [ - mod for mod in self._get_child_modules() if mod not in self._child_modules] + mod + for mod in self._get_child_modules() + if mod not in self._child_modules + ] self.disable_otg_related_modules(cur_modules) if self._child_modules: self.enable_otg_module(self._child_modules) @@ -104,11 +106,15 @@ def _get_child_modules(self): def enable_otg_module(self, modules): for module in modules: - subprocess.run("modprobe {}".format(module), shell=True, check=True) + subprocess.run( + "modprobe {}".format(module), shell=True, check=True + ) def disable_otg_related_modules(self, modules): for module in modules: - subprocess.run("modprobe -r {}".format(module), shell=True, check=True) + subprocess.run( + "modprobe -r {}".format(module), shell=True, check=True + ) def otg_setup(self): """ @@ -150,8 +156,9 @@ def create_otg_function(self): if not function_path.exists(): function_path.mkdir() - self.usb_gadget_node.joinpath( - "configs", "c.1", func_name).symlink_to(function_path, True) + self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to( + function_path, True + ) def _cleanup_usb_gadget(self): func_name = "{}.0".format(self.OTG_FUNCTION) @@ -207,7 +214,9 @@ def otg_setup(self): logging.info("Create an USB image file for Mass Storage Test") self._usb_img = tempfile.NamedTemporaryFile("+bw", delete=False) subprocess.run( - "dd if=/dev/zero of={} bs=1M count=1024".format(self._usb_img.name), + "dd if=/dev/zero of={} bs=1M count=1024".format( + self._usb_img.name + ), shell=True, check=True, ) @@ -233,8 +242,9 @@ def create_otg_function(self): function_path.joinpath("lun.0", "file").write_text(self._usb_img.name) - self.usb_gadget_node.joinpath( - "configs", "c.1", func_name).symlink_to(function_path, True) + self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to( + function_path, True + ) def detection_check_on_rpyc(self, rpyc_ip): logging.info("USB drive detection on RPYC") @@ -250,11 +260,13 @@ def detection_check_on_rpyc(self, rpyc_ip): def function_check_with_rpyc(self, rpyc_ip): logging.info("USB read/write testing on RPYC") - raise SystemExit(rpyc_client( - rpyc_ip, - "usb_storage_test", - self.usb_type, - )) + raise SystemExit( + rpyc_client( + rpyc_ip, + "usb_storage_test", + self.usb_type, + ) + ) def otg_test_process(self, rpyc_ip): logging.info("Start Mass Storage Testing with OTG interface") @@ -282,7 +294,9 @@ class OtgEthernetSetup(OtgConfigFsOperatorBase): OTG_TARGET_MODULE = "usb_f_ecm" def _collect_net_intfs(self): - return [os.path.basename(intf) for intf in glob.glob("/sys/class/net/*")] + return [ + os.path.basename(intf) for intf in glob.glob("/sys/class/net/*") + ] def otg_setup(self): self._net_intfs = self._collect_net_intfs() @@ -301,7 +315,9 @@ def self_check(self): otg_net_intf = [x for x in cur_net_intfs if x not in self._net_intfs] if len(otg_net_intf) != 1: - logging.error("Found more than one new interface. %s", otg_net_intf) + logging.error( + "Found more than one new interface. %s", otg_net_intf + ) else: logging.info("Found new network interface '%s'", otg_net_intf[0]) self._net_dev = otg_net_intf[0] @@ -327,7 +343,7 @@ def function_check_with_rpyc(self, rpyc_ip): logging.info("Ping from DUT to Target") _module = SourceFileLoader( "_", - os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py") + os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py"), ).load_module() test_func = getattr(_module, "perform_ping_test") ret = test_func([self._net_dev], "169.254.0.10") @@ -367,7 +383,9 @@ def self_check(self): otg_ser_intf = [x for x in cur_ser_intfs if x not in self._ser_intfs] if len(otg_ser_intf) != 1: - logging.error("Found more than one new interface. %s", otg_ser_intf) + logging.error( + "Found more than one new interface. %s", otg_ser_intf + ) else: logging.info("Found new network interface '%s'", otg_ser_intf[0]) self._serial_iface = otg_ser_intf[0] @@ -415,8 +433,8 @@ def otg_test_process(self, rpyc_ip): "N", 1, 3, - 1024 - ) + 1024, + ), ) t_thread.start() time.sleep(3) @@ -482,9 +500,7 @@ def register_arguments(): def main(): args = register_arguments() if args.mode == "test": - otg_testing( - args.udc_node, args.type, args.rpyc_address, args.usb_type - ) + otg_testing(args.udc_node, args.type, args.rpyc_address, args.usb_type) elif args.mode == "info": dump_otg_info(args.config) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py index 1c61248574..135dade0c2 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py @@ -17,10 +17,10 @@ def rpyc_client(host, cmd, *args, **kwargs): """ for _ in range(2): try: - conn = rpyc.connect(host, 60000, config={ - "allow_all_attrs": True, - "allow_exposed_attrs": False - } + conn = rpyc.connect( + host, + 60000, + config={"allow_all_attrs": True, "allow_exposed_attrs": False}, ) break except ConnectionRefusedError: @@ -47,4 +47,4 @@ def rpyc_client(host, cmd, *args, **kwargs): except rpyc.core.vinegar.GenericException as exc: raise SystemExit( "Zapper host failed to process the requested command." - ) from exc \ No newline at end of file + ) from exc diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py index 271f876c87..3619162974 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_server.py @@ -9,7 +9,6 @@ from rpyc.utils.server import ThreadedServer - logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) handler_std = logging.StreamHandler(sys.stdout) @@ -19,9 +18,7 @@ ) LIBS = { "enable_serial_server": { - "source": os.path.join( - CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py" - ), + "source": os.path.join(CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py"), "function": "server_mode", }, "serial_check": { @@ -60,7 +57,9 @@ def wrap(*args, **kwargs): if func.__name__ in dynamic_integrate_funcs: args = args[1:] - with redirect_stderr(sio_stderr) as stderr, redirect_stdout(sio_stdout) as stdout: + with redirect_stderr(sio_stderr) as stderr, redirect_stdout( + sio_stdout + ) as stdout: try: ret = func(*args, **kwargs) except SystemExit as exp: @@ -70,6 +69,7 @@ def wrap(*args, **kwargs): cls.logs = "stdout logs: {}".format(stdout.getvalue()) cls.logs += "\nstderr logs: {}".format(stderr.getvalue()) return ret + return wrap @@ -87,9 +87,7 @@ def _load_method_from_file(name, file, func): def append_method_to_service(cls): for key, value in LIBS.items(): - func = _load_method_from_file( - key, value["source"], value["function"] - ) + func = _load_method_from_file(key, value["source"], value["function"]) if func: setattr(cls, key, capture_io_logs(func)) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py index 44d752690d..93a72e1ff9 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py @@ -45,9 +45,7 @@ def _device_node_detect(func, device_type=""): print(".", end="", flush=True) attempts -= 1 print() - raise SystemExit( - "Failed to detect new {} interface".format(device_type) - ) + raise SystemExit("Failed to detect new {} interface".format(device_type)) def configure_local_network(interface, net_info): From 6e43a9d4e311bf2ecefc2165a53f959526e9f20b Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Fri, 10 Jan 2025 09:51:15 +0800 Subject: [PATCH 16/17] fix black8 issue fix black8 issue --- .../checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py index 2ea85f2558..91bb7c0e29 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py @@ -401,7 +401,6 @@ def main(): else: rs485_settings = None - if args.mode == "server": server_mode( args.node, From e21e641442c0fcd03ba464c9c24d5a4208035a21 Mon Sep 17 00:00:00 2001 From: stanley31huang Date: Fri, 10 Jan 2025 10:00:33 +0800 Subject: [PATCH 17/17] fix flake8 issue fix flake8 issue --- .../checkbox-provider-ce-oem/bin/rpyc_client.py | 2 +- .../checkbox-provider-ce-oem/bin/rpyc_test_methods.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py index 135dade0c2..c678c43221 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_client.py @@ -33,7 +33,7 @@ def rpyc_client(host, cmd, *args, **kwargs): wrap = rpyc.async_(func) res = wrap(*args, **kwargs) while res.ready: - print("Waiting for RPYC server complete %s".format(func)) + print("Waiting for RPYC server complete {}".format(func)) time.sleep(1) break if getattr(res._conn.root, "logs"): diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py index 93a72e1ff9..d22af8d3b6 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpyc_test_methods.py @@ -87,6 +87,8 @@ def usb_storage_test(usb_type): ) watcher.run_storage(mounted_partition) # usb_read_write been imported in run_watcher - # the temporary mount point been created while import it and been removed in gen_random_file function - # thus, we need to reload it to create temporary mount point in every testing cycle + # the temporary mount point been created while import it + # and the directory been removed in gen_random_file function + # thus, we need to reload it to create temporary mount point + # in every testing cycle reload(usb_read_write)