From 6a1a45ede768b7c748b771fe8dd7c6dcef94d716 Mon Sep 17 00:00:00 2001 From: Paolo Gentili Date: Thu, 21 Nov 2024 13:03:09 +0100 Subject: [PATCH] Change: remove explicit Zapper support from run_watcher --- .../checkbox_support/scripts/run_watcher.py | 92 +++++++++---------- .../tests/test_run_watcher.py | 92 +++---------------- .../base/bin/removable_storage_watcher.py | 49 +--------- 3 files changed, 64 insertions(+), 169 deletions(-) diff --git a/checkbox-support/checkbox_support/scripts/run_watcher.py b/checkbox-support/checkbox_support/scripts/run_watcher.py index 2124d8ed21..93b36cb8be 100644 --- a/checkbox-support/checkbox_support/scripts/run_watcher.py +++ b/checkbox-support/checkbox_support/scripts/run_watcher.py @@ -10,7 +10,6 @@ """ import argparse import logging -import os import re import select import sys @@ -19,7 +18,6 @@ from abc import ABC, abstractmethod from checkbox_support.helpers.timeout import timeout -from checkbox_support.scripts.zapper_proxy import zapper_run from checkbox_support.scripts.usb_read_write import ( mount_usb_storage, gen_random_file, @@ -35,6 +33,33 @@ ACTION_TIMEOUT = 30 +class ControllerInterface(ABC): + """ + Perform actions on the device user test. + """ + + @abstractmethod + def action(self, action_type): + """Perform action of type `action_type` on the DUT.""" + + +class ManualController(ControllerInterface): + """ + Interact with the device under test manually. + """ + + def action(self, action_type): + """Perform action of type `action_type` on the DUT.""" + + if action_type == "insertion": + print("\n\nINSERT NOW\n\n", flush=True) + elif action_type == "removal": + print("\n\nREMOVE NOW\n\n", flush=True) + else: + raise SystemExit("Invalid test case") + print("Timeout: {} seconds".format(ACTION_TIMEOUT), flush=True) + + class StorageInterface(ABC): """ StorageInterface makes sure each type of storage class should implement @@ -72,41 +97,21 @@ class StorageWatcher(StorageInterface): function to detect the insertion and removal of storage. """ - def __init__(self, storage_type, zapper_usb_address): + def __init__(self, storage_type, controller=ManualController()): self.storage_type = storage_type - self.zapper_usb_address = zapper_usb_address self.testcase = None self.test_passed = False self.mounted_partition = None + self._controller = controller def run(self): j = journal.Reader() j.seek_realtime(time.time()) p = select.poll() p.register(j, j.get_events()) - if self.zapper_usb_address: - zapper_host = os.environ.get("ZAPPER_HOST") - if not zapper_host: - raise SystemExit("ZAPPER_HOST environment variable not found!") - usb_address = self.zapper_usb_address - if self.testcase == "insertion": - print("Calling zapper to connect the USB device") - zapper_run( - zapper_host, "typecmux_set_state", usb_address, "DUT" - ) - elif self.testcase == "removal": - print("Calling zapper to disconnect the USB device") - zapper_run( - zapper_host, "typecmux_set_state", usb_address, "OFF" - ) - else: - if self.testcase == "insertion": - print("\n\nINSERT NOW\n\n", flush=True) - elif self.testcase == "removal": - print("\n\nREMOVE NOW\n\n", flush=True) - else: - raise SystemExit("Invalid test case") - print("Timeout: {} seconds".format(ACTION_TIMEOUT), flush=True) + + self._controller.action(self.testcase) + while p.poll(): if j.process() != journal.APPEND: continue @@ -168,8 +173,8 @@ class USBStorage(StorageWatcher): USBStorage handles the insertion and removal of usb2 and usb3. """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.mounted_partition = None self.device = None self.number = None @@ -257,8 +262,8 @@ class MediacardStorage(StorageWatcher): MediacardStorage handles the insertion and removal of sd, sdhc, mmc etc... """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.mounted_partition = None self.action = None self.device = None @@ -316,8 +321,8 @@ class MediacardComboStorage(StorageWatcher): etc., for devices that combine mediacard and usb storage. """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.mounted_partition = None self.action = None self.device = None @@ -358,8 +363,8 @@ class ThunderboltStorage(StorageWatcher): storage. """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self.mounted_partition = None self.action = None @@ -419,11 +424,6 @@ def parse_args(): ], help=("usb2, usb3, mediacard, mediacard_combo or thunderbolt"), ) - parser.add_argument( - "--zapper-usb-address", - type=str, - help="Zapper's USB switch address to use", - ) return parser.parse_args() @@ -432,17 +432,13 @@ def main(): watcher = None if args.storage_type == "thunderbolt": - watcher = ThunderboltStorage( - args.storage_type, args.zapper_usb_address - ) + watcher = ThunderboltStorage(args.storage_type) elif args.storage_type == "mediacard": - watcher = MediacardStorage(args.storage_type, args.zapper_usb_address) + watcher = MediacardStorage(args.storage_type) elif args.storage_type == "mediacard_combo": - watcher = MediacardComboStorage( - args.storage_type, args.zapper_usb_address - ) + watcher = MediacardComboStorage(args.storage_type) else: - watcher = USBStorage(args.storage_type, args.zapper_usb_address) + watcher = USBStorage(args.storage_type) if args.testcase == "insertion": mounted_partition = watcher.run_insertion() diff --git a/checkbox-support/checkbox_support/tests/test_run_watcher.py b/checkbox-support/checkbox_support/tests/test_run_watcher.py index 865963d4fd..68d6066fb5 100644 --- a/checkbox-support/checkbox_support/tests/test_run_watcher.py +++ b/checkbox-support/checkbox_support/tests/test_run_watcher.py @@ -18,7 +18,7 @@ # along with Checkbox. If not, see . import unittest -from unittest.mock import patch, call, MagicMock, mock_open +from unittest.mock import patch, call, MagicMock import argparse from systemd import journal @@ -26,6 +26,7 @@ from checkbox_support.scripts.run_watcher import ( StorageWatcher, USBStorage, + ManualController, MediacardStorage, MediacardComboStorage, ThunderboltStorage, @@ -47,7 +48,7 @@ def test_storage_watcher_run_with_insertion(self, mock_poll, mock_journal): mock_poll.return_value.poll.side_effect = [True, True, False] mock_storage_watcher = MagicMock() - mock_storage_watcher.zapper_usb_address = "" + mock_storage_watcher._controller = ManualController() # Test insertion mock_storage_watcher.testcase = "insertion" @@ -72,7 +73,7 @@ def test_storage_watcher_run_with_removal(self, mock_poll, mock_journal): mock_poll.return_value.poll.side_effect = [True, False] mock_storage_watcher = MagicMock() - mock_storage_watcher.zapper_usb_address = "" + mock_storage_watcher._controller = ManualController() # Test removal mock_storage_watcher.testcase = "removal" @@ -89,62 +90,12 @@ def test_storage_watcher_run_with_removal(self, mock_poll, mock_journal): def test_storage_watcher_run_invalid_testcase(self): mock_storage_watcher = MagicMock() mock_storage_watcher.testcase = "invalid" - mock_storage_watcher.zapper_usb_address = "" + mock_storage_watcher._controller = ManualController() with self.assertRaises(SystemExit) as cm: StorageWatcher.run(mock_storage_watcher) self.assertEqual(cm.exception.args[0], "Invalid test case") - @patch("systemd.journal.Reader") - @patch("select.poll") - @patch("os.environ.get") - @patch("checkbox_support.scripts.run_watcher.zapper_run") - def test_storage_watcher_run_with_insertion_with_zapper( - self, mock_zapper_run, mock_get, mock_poll, mock_journal - ): - mock_journal.return_value.process.return_value = journal.APPEND - mock_journal.return_value.__iter__.return_value = [ - {"MESSAGE": "line1"} - ] - mock_poll.return_value.poll.side_effect = [True, False] - mock_get.return_value = "zapper_addr" - - mock_storage_watcher = MagicMock() - mock_storage_watcher.zapper_usb_address = "usb_address" - - # Test insertion with zapper - mock_storage_watcher.testcase = "insertion" - StorageWatcher.run(mock_storage_watcher) - mock_zapper_run.assert_called_with( - "zapper_addr", "typecmux_set_state", "usb_address", "DUT" - ) - mock_storage_watcher._process_lines.assert_called_with(["line1"]) - - @patch("systemd.journal.Reader") - @patch("select.poll") - @patch("os.environ.get") - @patch("checkbox_support.scripts.run_watcher.zapper_run") - def test_storage_watcher_run_with_removal_with_zapper( - self, mock_zapper_run, mock_get, mock_poll, mock_journal - ): - mock_journal.return_value.process.return_value = journal.APPEND - mock_journal.return_value.__iter__.return_value = [ - {"MESSAGE": "line1"} - ] - mock_poll.return_value.poll.side_effect = [True, False] - mock_get.return_value = "zapper_addr" - - mock_storage_watcher = MagicMock() - mock_storage_watcher.zapper_usb_address = "usb_address" - - # Test removal with zapper - mock_storage_watcher.testcase = "removal" - StorageWatcher.run(mock_storage_watcher) - mock_zapper_run.assert_called_with( - "zapper_addr", "typecmux_set_state", "usb_address", "OFF" - ) - mock_storage_watcher._process_lines.assert_called_with(["line1"]) - @patch("systemd.journal.Reader") @patch("select.poll") def test_storage_watcher_run_not_passed(self, mock_poll, mock_journal): @@ -155,7 +106,6 @@ def test_storage_watcher_run_not_passed(self, mock_poll, mock_journal): mock_poll.return_value.poll.side_effect = [True, False] mock_storage_watcher = MagicMock() - mock_storage_watcher.zapper_usb_address = "" mock_storage_watcher.test_passed = False # Test not passed @@ -241,9 +191,8 @@ def test_storage_watcher_run_storage( mock_read_test.assert_called_with("file") def test_usb_storage_init(self): - usb_storage = USBStorage("usb2", "zapper_addr") + usb_storage = USBStorage("usb2") self.assertEqual(usb_storage.storage_type, "usb2") - self.assertEqual(usb_storage.zapper_usb_address, "zapper_addr") self.assertIsNone(usb_storage.mounted_partition) self.assertIsNone(usb_storage.device) self.assertIsNone(usb_storage.number) @@ -367,9 +316,8 @@ def test_usb_storage_parse_journal_line(self): self.assertEqual(mock_usb_storage.action, None) def test_mediacard_storage_init(self): - mediacard_storage = MediacardStorage("mediacard", "zapper_addr") + mediacard_storage = MediacardStorage("mediacard") self.assertEqual(mediacard_storage.storage_type, "mediacard") - self.assertEqual(mediacard_storage.zapper_usb_address, "zapper_addr") self.assertIsNone(mediacard_storage.mounted_partition) def test_mediacard_storage_validate_insertion(self): @@ -425,13 +373,8 @@ def test_mediacard_storage_parse_journal_line(self): self.assertEqual(mock_mediacard_storage.action, None) def test_mediacard_combo_storage_init(self): - mediacard_combo_storage = MediacardComboStorage( - "mediacard", "zapper_addr" - ) + mediacard_combo_storage = MediacardComboStorage("mediacard") self.assertEqual(mediacard_combo_storage.storage_type, "mediacard") - self.assertEqual( - mediacard_combo_storage.zapper_usb_address, "zapper_addr" - ) self.assertIsNone(mediacard_combo_storage.mounted_partition) def test_mediacard_combo_storage_validate_insertion(self): @@ -510,9 +453,8 @@ def test_mediacard_combo_storage_parse_journal_line(self): self.assertEqual(mock_mediacard_combo_storage.action, None) def test_thunderbolt_storage_init(self): - thunderbolt_storage = ThunderboltStorage("thunderbolt", "zapper_addr") + thunderbolt_storage = ThunderboltStorage("thunderbolt") self.assertEqual(thunderbolt_storage.storage_type, "thunderbolt") - self.assertEqual(thunderbolt_storage.zapper_usb_address, "zapper_addr") self.assertIsNone(thunderbolt_storage.mounted_partition) self.assertIsNone(thunderbolt_storage.action) @@ -577,21 +519,20 @@ def test_thunderbolt_storage_parse_journal_line(self): def test_parse_args(self): with patch( "sys.argv", - ["script.py", "insertion", "usb2", "--zapper-usb-address", "addr"], + ["script.py", "insertion", "usb2"], ): args = parse_args() self.assertEqual(args.testcase, "insertion") self.assertEqual(args.storage_type, "usb2") - self.assertEqual(args.zapper_usb_address, "addr") @patch("checkbox_support.scripts.run_watcher.USBStorage", spec=USBStorage) @patch("checkbox_support.scripts.run_watcher.parse_args") def test_main_usb_insertion(self, mock_parse_args, mock_usb): mock_parse_args.return_value = argparse.Namespace( - testcase="insertion", storage_type="usb2", zapper_usb_address=None + testcase="insertion", storage_type="usb2" ) main() - mock_usb.assert_called_with("usb2", None) + mock_usb.assert_called_with("usb2") self.assertEqual(mock_usb.return_value.run_insertion.call_count, 1) self.assertEqual(mock_usb.return_value.run_removal.call_count, 1) # get the watcher object from main @@ -604,10 +545,10 @@ def test_main_usb_insertion(self, mock_parse_args, mock_usb): @patch("checkbox_support.scripts.run_watcher.parse_args") def test_main_usb_storage(self, mock_parse_args, mock_usb): mock_parse_args.return_value = argparse.Namespace( - testcase="storage", storage_type="usb2", zapper_usb_address=None + testcase="storage", storage_type="usb2" ) main() - mock_usb.assert_called_with("usb2", None) + mock_usb.assert_called_with("usb2") self.assertEqual(mock_usb.return_value.run_insertion.call_count, 1) self.assertEqual(mock_usb.return_value.run_storage.call_count, 1) self.assertEqual(mock_usb.return_value.run_removal.call_count, 1) @@ -616,7 +557,7 @@ def test_main_usb_storage(self, mock_parse_args, mock_usb): @patch("checkbox_support.scripts.run_watcher.parse_args") def test_main_usb_invalid(self, mock_parse_args, mock_usb): mock_parse_args.return_value = argparse.Namespace( - testcase="invalid", storage_type="usb2", zapper_usb_address=None + testcase="invalid", storage_type="usb2" ) with self.assertRaises(SystemExit) as cm: main() @@ -631,7 +572,6 @@ def test_main_mediacard(self, mock_parse_args, mock_mediacard): mock_parse_args.return_value = argparse.Namespace( testcase="insertion", storage_type="mediacard", - zapper_usb_address=None, ) main() self.assertEqual(mock_mediacard.call_count, 1) @@ -649,7 +589,6 @@ def test_main_mediacard_combo(self, mock_parse_args, mock_mediacard): mock_parse_args.return_value = argparse.Namespace( testcase="insertion", storage_type="mediacard_combo", - zapper_usb_address=None, ) main() self.assertEqual(mock_mediacard.call_count, 1) @@ -667,7 +606,6 @@ def test_main_thunderbolt(self, mock_parse_args, mock_thunderbolt): mock_parse_args.return_value = argparse.Namespace( testcase="insertion", storage_type="thunderbolt", - zapper_usb_address=None, ) main() self.assertEqual(mock_thunderbolt.call_count, 1) diff --git a/providers/base/bin/removable_storage_watcher.py b/providers/base/bin/removable_storage_watcher.py index d10999c598..9edd2857ee 100755 --- a/providers/base/bin/removable_storage_watcher.py +++ b/providers/base/bin/removable_storage_watcher.py @@ -5,9 +5,7 @@ import copy import dbus import logging -import os import sys -import threading import gi @@ -26,7 +24,6 @@ from checkbox_support.parsers.udevadm import CARD_READER_RE # noqa: E402 from checkbox_support.parsers.udevadm import GENERIC_RE # noqa: E402 from checkbox_support.parsers.udevadm import FLASH_RE # noqa: E402 -from checkbox_support.scripts.zapper_proxy import zapper_run # noqa: E402 from checkbox_support.udev import get_interconnect_speed # noqa: E402 from checkbox_support.udev import get_udev_block_devices # noqa: E402 @@ -1048,11 +1045,6 @@ def main(): action="store_true", help="Don't require drive being automounted", ) - parser.add_argument( - "--zapper-usb-address", - type=str, - help="Zapper's USB switch address to use", - ) parser.set_defaults(logging_level=logging.WARNING) args = parser.parse_args() @@ -1097,42 +1089,11 @@ def main(): ) # Run the actual listener and wait till it either times out of discovers # the appropriate media changes - if args.zapper_usb_address: - zapper_host = os.environ.get("ZAPPER_HOST") - if not zapper_host: - raise SystemExit("ZAPPER_HOST environment variable not found!") - usb_address = args.zapper_usb_address - delay = 5 # in seconds - - def do_the_insert(): - logging.info("Calling zapper to connect the USB device") - zapper_run(zapper_host, "typecmux_set_state", usb_address, "DUT") - - insert_timer = threading.Timer(delay, do_the_insert) - - def do_the_remove(): - logging.info("Calling zapper to disconnect the USB device") - zapper_run(zapper_host, "typecmux_set_state", usb_address, "OFF") - - remove_timer = threading.Timer(delay, do_the_remove) - if args.action == "insert": - logging.info("Starting timer for delayed insertion") - insert_timer.start() - elif args.action == "remove": - logging.info("Starting timer for delayed removal") - remove_timer.start() - try: - res = listener.check(args.timeout) - return res - except KeyboardInterrupt: - return 1 - - else: - print("\n\n{} NOW\n\n".format(args.action.upper()), flush=True) - try: - return listener.check(args.timeout) - except KeyboardInterrupt: - return 1 + print("\n\n{} NOW\n\n".format(args.action.upper()), flush=True) + try: + return listener.check(args.timeout) + except KeyboardInterrupt: + return 1 if __name__ == "__main__":