Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ce-oem] Add wifi AP mode test (New) #1606

Merged
merged 34 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1ce1084
Resolve code conflict while rebasing
rickwu666666 Dec 10, 2024
97b5f1f
Fix black format
rickwu666666 Nov 19, 2024
db6b368
Fix shell check fail
rickwu666666 Nov 19, 2024
ab308d9
Fix flake8
rickwu666666 Nov 19, 2024
e1d35f5
Remove manual test jobs, since those jobs are the same as automated jobs
rickwu666666 Nov 20, 2024
838b76b
Rename --manual arg to --set-ap-only
rickwu666666 Nov 20, 2024
e5602c0
Update contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/wifi_test.py
rickwu666666 Dec 4, 2024
02d3621
Move run_command function out of WiFiManager class
rickwu666666 Dec 3, 2024
7fead1a
Add connect_dut function in WiFiManager class to generate the related…
rickwu666666 Dec 4, 2024
47dd62d
Modify args type/interface/band/channel as required args
rickwu666666 Dec 4, 2024
a423212
Add arg type in the job
rickwu666666 Dec 4, 2024
27d2cfb
Update contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/wifi_test.py
rickwu666666 Dec 10, 2024
a8538b5
Update contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/wifi_test.py
rickwu666666 Dec 10, 2024
9a77b08
Move set_secured and up_conn as part of __enter__ function
rickwu666666 Dec 10, 2024
4396f43
Add log for the running command
rickwu666666 Dec 10, 2024
8e4df5d
Remove --set-ap-only related code
rickwu666666 Dec 11, 2024
f29442a
Modify argparser as two subparser for wifi and wifi-p2p to make comma…
rickwu666666 Dec 11, 2024
da9c11b
Move type wifi initial steps into init_conn function
rickwu666666 Dec 11, 2024
c6f4b96
Add some description of this script about the limitation of network-m…
rickwu666666 Dec 11, 2024
812a99e
Fix argparse issue
rickwu666666 Dec 12, 2024
bd7c1de
Modify test and add a defualt set to test
rickwu666666 Dec 12, 2024
c7944a7
Update contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/wifi_test.py
rickwu666666 Dec 23, 2024
0175bd8
Change the logic of connect DUT from HOST and ping test
rickwu666666 Dec 23, 2024
cce3047
Fix black format
rickwu666666 Dec 23, 2024
9665226
fix error handling for the connect_dut_from_host_via_wifi function
rickwu666666 Dec 24, 2024
66cd32e
change error message to handle by raise SystemError
rickwu666666 Dec 24, 2024
fee3486
Refactor the logic of creating and bringing up the connection from th…
rickwu666666 Dec 24, 2024
7f2c307
Add arg --host-interface to the job, since we add this args in the sc…
rickwu666666 Dec 24, 2024
656bcf4
Fix black format
rickwu666666 Dec 24, 2024
cba8535
Fix issue that connect with psk from HOST
rickwu666666 Dec 24, 2024
6b3ac21
Fix black format
rickwu666666 Dec 24, 2024
57fba1e
Reduce the fail retry to 3 time in 30s
rickwu666666 Dec 25, 2024
0abbda1
Fix black
rickwu666666 Dec 25, 2024
bcbef8b
Merge branch 'main' into add-wifi-ap-mode-test
rickwu666666 Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 386 additions & 0 deletions contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/wifi_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,386 @@
#!/usr/bin/env python3
"""
This script base on network-manager. And network-manager has it own
limitation about wifi ap mode and wifi-p2p.
For wifi ap mode:
Only band a and bg are supported
For wifi-p2p:
We are not able to validate the result even following the user
manual of network-manager.
Please refer to following for more details:
[1] https://networkmanager.dev/docs/api/latest/nm-settings-nmcli.html
[2] https://netplan.readthedocs.io/en/stable/netplan-yaml
[3] https://bugs.launchpad.net/carmel/+bug/2080353/comments/2
"""
import argparse
import subprocess
import sys
import time
import logging
import re
import random
import string
from contextlib import contextmanager

# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)


class WiFiManager:
def __init__(self, **kwargs):
"""
Initialize the WiFiManager with dynamic arguments.
kwargs: Dictionary of configuration arguments.
"""
self._command = "nmcli"
self.interface = kwargs.get("interface")
self.type = kwargs.get("type")
self.mode = kwargs.get("mode")
self.band = kwargs.get("band")
self.channel = kwargs.get("channel")
self.key_mgmt = kwargs.get("keymgmt")
self.group = kwargs.get("group")
self.peer = kwargs.get("peer")
self.ssid = kwargs.get("ssid")
self.ssid_pwd = kwargs.get("ssid_pwd")
self.conname = "qa-test-ap"

def init_conn(self):
logging.info("Initializing connection")
if self.type == "wifi":
run_command(
"{} c add type {} ifname {} con-name {} "
"autoconnect no wifi.ssid {} "
"wifi.mode {} ipv4.method shared".format(
self._command,
self.type,
self.interface,
self.conname,
self.ssid,
self.mode,
)
)
self.set_band_channel()
if self.key_mgmt:
self.set_secured()
elif self.type == "wifi-p2p":
run_command(
"{} c add type {} ifname {} con-name {} "
"wifi-p2p.peer {}".format(
self._command,
self.type,
self.interface,
self.conname,
self.peer,
)
)
else:
raise ValueError("Unsupported type: {}".format(self.type))

def set_band_channel(self):
"""
Set band, channel and channel-width.
If channel and channel-width in 0, run command with setting band only.
"""
cmd = "{} c modify {} wifi.band {} ".format(
self._command, self.conname, self.band
)
if self.channel:
cmd += "wifi.channel {} ".format(self.channel)
run_command(cmd)

def set_secured(self):
run_command(
"{} c modify {} wifi-sec.key-mgmt {} wifi-sec.psk {} "
"wifi-sec.group {}".format(
self._command,
self.conname,
self.key_mgmt,
self.ssid_pwd,
self.group,
)
)

def get_ip_addr(self):
"""
nmcli -g IP4.ADDRESS command will return the IPv4 address of the
interface with netmask.
e.g. 10.102.99.224/22
"""
ip_addr = run_command(
"{} -g IP4.ADDRESS device show {}".format(
self._command,
self.interface,
)
)
ip_addr = ip_addr.split("/")[0] if ip_addr.find("/") != -1 else ""
return ip_addr

def up_cmd(self):
return "{} c up {}".format(self._command, self.conname)

def up_conn(self):
try:
run_command(self.up_cmd())
logging.info("Initialized connection successful!")
except Exception:
raise SystemError("Bring up connection failed!")
for i in range(1, 6):
try:
ip_addr = self.get_ip_addr()
if ip_addr:
logging.info("IP address is {}".format(ip_addr))
return True
except subprocess.CalledProcessError:
pass
time.sleep(5)
return False

def del_cmd(self):
return "{} c delete {}".format(self._command, self.conname)

def connect_dut_cmd(self, host_if):
connect_cmd = (
"{} con add type wifi ifname {} con-name {} ssid {}".format(
self._command, host_if, self.conname, self.ssid
)
)
if self.key_mgmt:
connect_cmd += " wifi-sec.key-mgmt {}" " wifi-sec.psk {}".format(
self.key_mgmt, self.ssid_pwd
)
if self.mode == "adhoc":
connect_cmd += " wifi.mode {}".format(self.mode)
return connect_cmd

def __enter__(self):
self.init_conn()
if not self.up_conn():
raise RuntimeError("Connection initialization failed!")
return self

def __exit__(self, exc_type, exc_value, traceback):
logging.info("Exiting context and cleaning up connection")
cmd = self.del_cmd()
run_command(cmd)


def run_command(command):
logging.info("Run command: %s", command)
output = subprocess.check_output(
command, shell=True, text=True, stderr=subprocess.STDOUT
)
return output


def sshpass_cmd_gen(ip, user, pwd, cmd):
return "sshpass -p {} ssh -o StrictHostKeyChecking=no {}@{} {}".format(
pwd, user, ip, cmd
)


def ping_cmd(ip):
return "ping {} -c 4".format(ip)


@contextmanager
def connect_dut_from_host_via_wifi(host_net_info: dict, connect_info: dict):
ip = host_net_info["ip"]
user = host_net_info["user"]
pwd = host_net_info["pwd"]
ssid = connect_info["ssid"]
connect_cmd = connect_info["connect_cmd"]
del_host_conn = connect_info["delete_cmd"]
up_host_conn = connect_info["up_cmd"]
connected = False

logging.info("Pinging target host first...")
try:
run_command(ping_cmd(ip))
logging.info("Ping to target host %s successful.", ip)
except Exception as e:
raise SystemError("Unable to ping the HOST! Error: %s", str(e))
try:
connected = create_conn_from_host(ip, user, pwd, connect_cmd)
if connected:
if bring_up_conn_from_host(ip, user, pwd, up_host_conn):
yield
except Exception as e:
raise SystemError(e)
finally:
if connected:
try:
run_command(sshpass_cmd_gen(ip, user, pwd, del_host_conn))
logging.info("Deleted host connection successfully.")
except Exception as e:
raise SystemError(
"Failed to delete host connection: %s", str(e)
)
else:
raise SystemError(
"Unable to connect to DUT AP SSID %s after 10 attempts.", ssid
)


def create_conn_from_host(ip, user, pwd, connect_cmd):
logging.info("Attempting to create the connection configuration on HOST")
try:
run_command(sshpass_cmd_gen(ip, user, pwd, connect_cmd))
logging.info("Create connection configuration successful!")
return True
except Exception as e:
logging.warning(
"Unable to create connection configuration on HOST. %s", str(e)
)


def bring_up_conn_from_host(ip, user, pwd, up_host_conn):
for i in range(1, 4):
logging.info(
"Attempting to bring up the connection on HOST (%d/%d)...", i, 3
)
try:
run_command(sshpass_cmd_gen(ip, user, pwd, up_host_conn))
logging.info("Bring up connection successful!")
return True
except Exception as e:
logging.warning(
"Unable to bring up connection on HOST. Attempt %d failed."
" Error: %s",
i,
str(e),
)
time.sleep(10)


def ping_test(target_ip, host_net_info: dict):
ip = host_net_info["ip"]
user = host_net_info["user"]
pwd = host_net_info["pwd"]
try:
logging.info("Attempting to ping DUT...")
ping_result = run_command(
sshpass_cmd_gen(ip, user, pwd, ping_cmd(target_ip))
)

packet_loss_match = re.search(r"(\d+)% packet loss", ping_result)
if packet_loss_match:
packet_loss = packet_loss_match.group(1)
logging.info("Packet loss: %s %%", packet_loss)
if packet_loss == "0":
logging.info("Ping DUT passed.")
return 0
else:
logging.error(
"Ping DUT failed with %s %% packet loss!", packet_loss
)
else:
logging.error("Could not parse packet loss from ping result.")

except Exception as e:
logging.error("An error occurred during ping_test: %s", str(e))
return 1


def main():
parser = argparse.ArgumentParser(description="WiFi test")
subparsers = parser.add_subparsers(
dest="type",
required=True,
help="Type of connection. " 'Currentlly support "wifi" and "wifi-p2p"',
)
# Subparser for 'wifi'
wifi_parser = subparsers.add_parser("wifi", help="WiFi configuration")
wifi_parser.add_argument(
"--mode",
choices=["ap", "adhoc"],
required=True,
help="WiFi mode: ap or adhoc",
)
wifi_parser.add_argument("--band", required=True, help="WiFi band to use")
wifi_parser.add_argument(
"--channel", required=True, type=int, help="WiFi channel to use"
)
wifi_parser.add_argument(
"--keymgmt", required=False, help="Key management method"
)
wifi_parser.add_argument(
"--group", required=False, help="Group key management method"
)
wifi_parser.add_argument(
"--ssid",
default="".join(
[random.choice(string.ascii_letters) for _ in range(10)]
),
required=False,
help="SSID for AP mode",
)
wifi_parser.add_argument(
"--ssid-pwd",
required=False,
default="insecure",
help="Password for SSID",
)

# Subparser for 'wifi-p2p'
wifi_p2p_parser = subparsers.add_parser(
"wifi-p2p", help="WiFi P2P configuration"
)
wifi_p2p_parser.add_argument(
"--peer", required=True, help="MAC address for P2P peer"
)
parser.add_argument(
"--interface", required=True, help="WiFi interface to use"
)
parser.add_argument(
"--host-ip",
required=True,
help="IP address of the Host device to connect to."
"The HOST is a device to access the DUT's AP.",
)
parser.add_argument(
"--host-user",
required=True,
help="Username of the Host device for SSH connection",
)
parser.add_argument(
"--host-pwd",
required=True,
help="Password of the Host device for SSH connection",
)
parser.add_argument(
"--host-interface",
required=True,
help="The wifi interface name of the Host device",
)

args = parser.parse_args()
rickwu666666 marked this conversation as resolved.
Show resolved Hide resolved
config = vars(args)
try:
with WiFiManager(**config) as manager:
host_net_info = {
"ip": args.host_ip,
"user": args.host_user,
"pwd": args.host_pwd,
}
connect_info = {
"ssid": args.ssid,
"connect_cmd": manager.connect_dut_cmd(args.host_interface),
"delete_cmd": manager.del_cmd(),
"up_cmd": manager.up_cmd(),
}
with connect_dut_from_host_via_wifi(host_net_info, connect_info):
ret = ping_test(manager.get_ip_addr(), host_net_info)
sys.exit(ret)
except SystemError as e:
logging.error(e)
sys.exit(1)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ nested_part:
before-suspend-ce-oem-spi-automated
ce-oem-gadget-automated
ce-oem-mir-automated
ce-oem-wifi-ap-automated
ce-oem-power-automated-by-pdu
certification_status_overrides:
apply blocker to .*
Expand Down
Loading
Loading