Skip to content

Commit

Permalink
Add: implement the RPMSG-tty tests
Browse files Browse the repository at this point in the history
implement the RPMSG-tty tests
  • Loading branch information
stanley31huang committed Dec 5, 2023
1 parent 4734a51 commit 2aa5241
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 62 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tox.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ jobs:
python-version: ${{ matrix.python }}
- name: Install tox and otehr dependencies
run: |
python -m pip install --upgrade pip
pip install tox
python3 -m pip install --upgrade pip
pip3 install tox
- name: Run tox
working-directory: /home/runner/checkbox/providers/checkbox-provider-ce-oem/
run: tox
run: tox -e py${{ matrix.python }}
126 changes: 106 additions & 20 deletions checkbox-provider-ce-oem/bin/rpmsg_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,43 @@
import argparse
import datetime
import os
import sys
import re
import subprocess
import select
import logging
from systemd import journal
from serial_test import serial_init, client_mode


def init_logger():
"""
Set the logger to log DEBUG and INFO to stdout, and
WARNING, ERROR, CRITICAL to stderr.
"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
logger_format = "%(asctime)s %(levelname)-8s %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"

# Log DEBUG and INFO to stdout, others to stderr
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(logging.Formatter(logger_format, date_format))

stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter(logger_format, date_format))

stdout_handler.setLevel(logging.DEBUG)
stderr_handler.setLevel(logging.WARNING)

# Add a filter to the stdout handler to limit log records to
# INFO level and below
stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO)

root_logger.addHandler(stderr_handler)
root_logger.addHandler(stdout_handler)

return root_logger


RPMSG_ROOT = "/sys/bus/rpmsg/devices"
Expand All @@ -23,13 +56,13 @@ def check_rpmsg_device():
Returns:
rpmsg_devices (list): a list of RPMSG device path
"""
print("## Checking RPMSG device is available ...")
logging.info("## Checking RPMSG device is available ...")

rpmsg_devices = os.listdir(RPMSG_ROOT)
if not rpmsg_devices:
raise SystemExit("RPMSG device is not available")
else:
print("RPMSG device is available")
logging.info("RPMSG device is available")

return rpmsg_devices

Expand All @@ -44,7 +77,7 @@ def get_rpmsg_channel():
Returns:
rpmsg_channels (list): a list of RPMSG destination channel
"""
print("## Checking RPMSG channel ...")
logging.info("## Checking RPMSG channel ...")

rpmsg_channels = []
rpmsg_devices = check_rpmsg_device()
Expand All @@ -55,7 +88,7 @@ def get_rpmsg_channel():
rpmsg_channels.append(fp.read().strip("\n"))

if rpmsg_channels:
print("Available RPMSG channels is {}".format(rpmsg_channels))
logging.info("Available RPMSG channels is %s", rpmsg_channels)
else:
raise SystemExit("RPMSG channel is not created")

Expand All @@ -75,7 +108,7 @@ def get_soc_family():
with open(path, "r") as fp:
soc_family = fp.read().strip()

print("SoC family is {}".format(soc_family))
logging.info("SoC family is %s", soc_family)
return soc_family


Expand All @@ -92,7 +125,7 @@ def get_soc_machine():
with open(path, "r") as fp:
soc_machine = fp.read().strip()

print("SoC machine is {}".format(soc_machine))
logging.info("SoC machine is %s", soc_machine)
return soc_machine


Expand All @@ -105,7 +138,7 @@ def detect_arm_processor_type():
"""
family = get_soc_family()
machine = get_soc_machine()
print("SoC family is {}, machine is {}".format(family, machine))
logging.info("SoC family is %s, machine is %s", family, machine)

if "i.MX" in family or "i.MX" in machine:
arm_cpu_type = "imx"
Expand All @@ -125,7 +158,7 @@ def pingpong_test(cpu_type):
SystemExit: if ping pong event count is not expected
"""

print("## Probe pingpong kernel module")
logging.info("## Probe pingpong kernel module")
if cpu_type == "imx":
kernel_module = "imx_rpmsg_pingpong"
probe_cmd = "modprobe {}".format(kernel_module)
Expand Down Expand Up @@ -160,8 +193,8 @@ def pingpong_test(cpu_type):
poll.register(log_reader, log_reader.get_events())

start_time = datetime.datetime.now()
print("# start time: {}".format(start_time))
print("# probe pingpong module with '{}'".format(probe_cmd))
logging.info("# start time: %s", start_time)
logging.info("# probe pingpong module with '%s'", probe_cmd)
try:
subprocess.run(probe_cmd, shell=True)
except subprocess.CalledProcessError:
Expand All @@ -174,7 +207,7 @@ def pingpong_test(cpu_type):
continue

for entry in log_reader:
print(entry["MESSAGE"])
logging.info(entry["MESSAGE"])
if entry["MESSAGE"] == "":
continue

Expand All @@ -191,29 +224,82 @@ def pingpong_test(cpu_type):
if needed_break:
break

print("## Check Ping pong test is finish")

logging.info("## Check Ping pong test is finish")
if len(pingpong_events) != expected_count:
print(
"ping-pong count is not match. expected {}, actual: {}".format(
expected_count, len(pingpong_events))
)
logging.info(
"ping-pong count is not match. expected %s, actual: %s",
expected_count, len(pingpong_events))
raise SystemExit("The ping-pong message is not match.")
else:
print("ping-pong logs count is match")
logging.info("ping-pong logs count is match")


def serial_tty_test(cpu_type, data_size):
"""
Probe rpmsg-tty kernel module for RPMSG TTY test
Raises:
SystemExit: in following condition
- CPU type is not supported or
- RPMSG TTY device is not exists or
- no data received from serial device
- received data not match
"""
def __check_rpmsg_devices():
rpmsg_devices = [
os.sep.join([root, dev]) for dev in os.listdir(root)
if re.search(check_pattern, dev)
]
return rpmsg_devices

logging.info("## Probe rpmsg-tty kernel module")
if cpu_type == "imx":
kernel_module = "imx_rpmsg_tty"
probe_cmd = "modprobe {}".format(kernel_module)
check_pattern = r"ttyRPMSG[0-9]+"
elif cpu_type == "ti":
# Not tested on TI platform yet
raise SystemExit("Unsupported method for TI.")

kernel_module = "rpmsg_pru"
probe_cmd = "modprobe {}".format(kernel_module)
check_pattern = r"rpmsg_pru[0-9]+"
else:
raise SystemExit("Unexpected CPU type.")

root = "/dev"
rpmsg_devs = __check_rpmsg_devices()
if not rpmsg_devs:
start_time = datetime.datetime.now()
logging.info("# start time: %s", start_time)
logging.info("# probe RPMSG tty module with '%s'", probe_cmd)
try:
subprocess.run(probe_cmd, shell=True)
except subprocess.CalledProcessError:
pass

rpmsg_devs = __check_rpmsg_devices()

if rpmsg_devs:
client_mode(serial_init(rpmsg_devs[0]), data_size)
else:
raise SystemExit("No RPMSG TTY devices found.")


def main():
parser = argparse.ArgumentParser(description='RPMSG related test')
parser.add_argument('--type',
help='To filter out PKCS11 for the suite',
choices=["detect", "pingpong"])
help='RPMSG tests',
choices=["detect", "pingpong", "serial-tty"])
args = parser.parse_args()
init_logger()

if args.type == "detect":
check_rpmsg_device()
elif args.type == "pingpong":
pingpong_test(detect_arm_processor_type())
elif args.type == "serial-tty":
serial_tty_test(detect_arm_processor_type(), 1024)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,62 @@
all rs485/232/422 that on DUT to the server(RPi 3). And test the
port on DUT.
"""


import sys
import argparse
import serial
import time
import string
import random
import logging

logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
# logging.FileHandler("rs485-stress.log"),
logging.StreamHandler(),
],
)

def init_logger():
"""
Set the logger to log DEBUG and INFO to stdout, and
WARNING, ERROR, CRITICAL to stderr.
"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
logger_format = "%(asctime)s %(levelname)-8s %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"

# Log DEBUG and INFO to stdout, others to stderr
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(logging.Formatter(logger_format, date_format))

stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter(logger_format, date_format))

stdout_handler.setLevel(logging.DEBUG)
stderr_handler.setLevel(logging.WARNING)

# Add a filter to the stdout handler to limit log records to
# INFO level and below
stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO)

root_logger.addHandler(stderr_handler)
root_logger.addHandler(stdout_handler)

return root_logger


def str_generator(size):
chars = []
chars.extend(string.ascii_uppercase)
chars.extend(string.ascii_lowercase)
chars.extend(string.digits)
chars.extend(string.punctuation)

def str_generator(
size,
chars=string.ascii_uppercase
+ string.digits
+ string.ascii_lowercase
+ string.punctuation,
):
return "".join(random.choice(chars) for _ in range(size))
return "".join(random.choices(chars, k=size))


def serial_init(args):
def serial_init(device, **kwargs):
ser = serial.Serial(
args.device,
baudrate=args.baudrate,
bytesize=args.bytesize,
parity=args.parity,
stopbits=args.stopbits,
device,
baudrate=kwargs.get("baudrate", 115200),
bytesize=kwargs.get("bytesize", 8),
parity=kwargs.get("parity", "N"),
stopbits=kwargs.get("stopbits", 1),
timeout=1,
write_timeout=1,
xonxoff=True
Expand Down Expand Up @@ -80,26 +100,26 @@ def receiver(ser):
return rcv


def server_mode(ser, args):
def server_mode(ser):
"""
Running as a server, it will be sniffing for received string.
And it will send the same string out.
usage:
running on port /dev/ttyUSB0 as a server
$ sudo ./rs485-remote.py /dev/ttyUSB0 --mode server
"""
logging.info("Listening on port {} ...".format(args.device))
logging.info("Listening on port {} ...".format(ser._port))
while True:
re_string = receiver(ser)
if re_string:
time.sleep(3)
logging.info("Send string back ...")
sender(ser, re_string)
logging.info("Listening on port {} ...".format(args.device))
logging.info("Listening on port {} ...".format(ser._port))
ser.reset_input_buffer()


def client_mode(ser, args):
def client_mode(ser, data_length):
"""
Running as a clinet and it will sending out a string and wait
the string send back from server. After receive the string,
Expand All @@ -108,19 +128,17 @@ def client_mode(ser, args):
running on port /dev/ttymxc1 as a client
$ sudo ./rs485-remotr.py /dev/ttymxc1 --mode client
"""
test_str = "{}-{}".format(args.device, str_generator(args.size))
test_str = "{}-{}".format(ser._port, str_generator(data_length))
sender(ser, test_str)
for i in range(1, 6):
logging.info("Attempting receive string... {} time".format(i))
time.sleep(3)
readback = receiver(ser)
if readback:
if readback == test_str:
logging.info("Expect: {}".format(test_str))
logging.info("Received string is correct!")
raise SystemExit(0)
else:
logging.info("Expect: {}".format(test_str))
logging.error("Received string is incorrect!")
raise SystemExit(1)
logging.error("Not able to receive string!!")
Expand Down Expand Up @@ -161,11 +179,19 @@ def main():
default=1,
)
args = parser.parse_args()
ser = serial_init(args)
init_logger()
ser = serial_init(
args.device,
baudrate=args.baudrate,
bytesize=args.bytesize,
parity=args.parity,
stopbits=args.stopbits,
)

if args.mode == "server":
server_mode(ser, args)
server_mode(ser)
else:
client_mode(ser, args)
client_mode(ser, args.size)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 2aa5241

Please sign in to comment.