From 75f04787ae4fdc676fc268ccd56884230783e7fb Mon Sep 17 00:00:00 2001 From: Konstantin Selyunin Date: Sat, 30 Nov 2024 11:30:35 +0100 Subject: [PATCH] :sparkles: add BMI323 [#7] Add support for BMI323. Add examples for BMI323. Move register addresses to enum classes. This improves auto-completion and structure of the sensor class. Add unit tests. --- examples/README.md | 17 + examples/bmi323/__init__.py | 0 .../bmi323/bmi323_i2c_interrupt_streaming.py | 46 ++ .../bmi323/bmi323_i2c_polling_streaming.py | 48 ++ examples/bmi323/bmi323_i2c_read_write.py | 59 +++ .../bmi323/bmi323_spi_interrupt_streaming.py | 49 ++ .../bmi323/bmi323_spi_polling_streaming.py | 51 ++ examples/bmi323/bmi323_spi_read_write.py | 68 +++ src/umrx_app_v3/sensors/bmi088.py | 247 ++++----- src/umrx_app_v3/sensors/bmi323.py | 480 ++++++++++++++++++ .../shuttle_board/bmi088/bmi088_shuttle.py | 12 +- .../shuttle_board/bmi323/__init__.py | 0 .../shuttle_board/bmi323/bmi323_shuttle.py | 240 +++++++++ tests/conftest.py | 18 + tests/sensors/__init__.py | 0 tests/sensors/test_bmi088.py | 48 ++ tests/sensors/test_bmi323.py | 30 ++ 17 files changed, 1290 insertions(+), 123 deletions(-) create mode 100644 examples/bmi323/__init__.py create mode 100644 examples/bmi323/bmi323_i2c_interrupt_streaming.py create mode 100644 examples/bmi323/bmi323_i2c_polling_streaming.py create mode 100644 examples/bmi323/bmi323_i2c_read_write.py create mode 100644 examples/bmi323/bmi323_spi_interrupt_streaming.py create mode 100644 examples/bmi323/bmi323_spi_polling_streaming.py create mode 100644 examples/bmi323/bmi323_spi_read_write.py create mode 100644 src/umrx_app_v3/sensors/bmi323.py create mode 100644 src/umrx_app_v3/shuttle_board/bmi323/__init__.py create mode 100644 src/umrx_app_v3/shuttle_board/bmi323/bmi323_shuttle.py create mode 100644 tests/sensors/__init__.py create mode 100644 tests/sensors/test_bmi088.py create mode 100644 tests/sensors/test_bmi323.py diff --git a/examples/README.md b/examples/README.md index cdb7ecc..b6b0629 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,6 +5,9 @@ These are self-contained examples for using the `umrx-v3-py` with Application Bo * [`switch_app_dfu.py`](./switch_app_dfu.py) * [`switch_app_mtp.py`](./switch_app_mtp.py) + +## [`bmi088`](https://www.bosch-sensortec.com/products/motion-sensors/imus/bmi088/) + The examples in the [`bmi088`](./bmi088) folder show different communication features for the [BMI088 shuttle](https://www.bosch-sensortec.com/media/boschsensortec/downloads/shuttle_board_flyer/application_board_3_1/bst-bmi088-sf000.pdf) @@ -17,6 +20,20 @@ board: * [`bmi088/bmi088_spi_polling_streaming.py`](./bmi088/bmi088_spi_polling_streaming.py) * [`bmi088/bmi088_spi_interrupt_streaming.py`](./bmi088/bmi088_spi_interrupt_streaming.py) +## [`bmi323`](https://www.bosch-sensortec.com/products/motion-sensors/imus/bmi323/) + +The examples in the [`bmi323`](./bmi323) folder +show different communication features for the +[BMI323 shuttle](https://www.bosch-sensortec.com/media/boschsensortec/downloads/shuttle_board_flyer/bst-bmi323-sf000.pdf) +board: + +* [`bmi088/bmi323_i2c_read_write.py`](./bmi323/bmi323_i2c_read_write.py) +* [`bmi088/bmi323_i2c_polling_streaming.py`](./bmi323/bmi323_i2c_polling_streaming.py) +* [`bmi088/bmi323_i2c_interrupt_streaming.py`](./bmi323/bmi323_i2c_interrupt_streaming.py) +* [`bmi088/bmi323_spi_read_write.py`](./bmi323/bmi323_spi_read_write.py) +* [`bmi088/bmi323_spi_polling_streaming.py`](./bmi323/bmi323_spi_polling_streaming.py) +* [`bmi088/bmi323_spi_interrupt_streaming.py`](./bmi323/bmi323_spi_interrupt_streaming.py) + ## Need a specific example or do not know how to read data from your sensor? diff --git a/examples/bmi323/__init__.py b/examples/bmi323/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/bmi323/bmi323_i2c_interrupt_streaming.py b/examples/bmi323/bmi323_i2c_interrupt_streaming.py new file mode 100644 index 0000000..fdd4e7c --- /dev/null +++ b/examples/bmi323/bmi323_i2c_interrupt_streaming.py @@ -0,0 +1,46 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bmi323.bmi323_shuttle import BMI323Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"acc_chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x0043 + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + d_x, d_y, d_z = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") + assert shuttle.sensor.chip_id == 0x0043 + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + a_x, a_y, a_z, g_x, g_y, g_z, temperature, time_stamp = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:04X}") + logger.info(f"err_reg=0x{shuttle.sensor.status:04X}") + + a_x, a_y, a_z = shuttle.sensor.acc_data + logger.info(f"acceleration=({a_x=:04X}, {a_y=:04X}, {a_z=:04X})") + + g_x, g_y, g_z = shuttle.sensor.gyr_data + logger.info(f"angular_velocity=({g_x=:04X}, {g_y=:04X}, {g_z=:04X})") + + logger.info(f"sensor_time={shuttle.sensor.sensor_time:04X}") + + logger.info(f"acc_conf=0x{shuttle.sensor.acc_conf:04X}") + logger.info(f"gyr_conf=0x{shuttle.sensor.gyr_conf:04X}") + + shuttle.sensor.acc_conf = 0x4027 + shuttle.sensor.gyr_conf = 0x404B + + logger.info(f"acc_conf=0x{shuttle.sensor.acc_conf:04X}") + logger.info(f"gyr_conf=0x{shuttle.sensor.gyr_conf:04X}") + + logger.info(f"alt_acc_conf=0x{shuttle.sensor.alt_acc_conf:04X}") + logger.info(f"alt_gyr_conf=0x{shuttle.sensor.alt_gyr_conf:04X}") + time.sleep(0.1) + a_x, a_y, a_z = shuttle.sensor.acc_data + logger.info(f"acceleration=({a_x=}, {a_y=}, {a_z=})") + + g_x, g_y, g_z = shuttle.sensor.gyr_data + logger.info(f"angular_velocity=({g_x=}, {g_y=}, {g_z=})") diff --git a/examples/bmi323/bmi323_spi_interrupt_streaming.py b/examples/bmi323/bmi323_spi_interrupt_streaming.py new file mode 100644 index 0000000..05a7c28 --- /dev/null +++ b/examples/bmi323/bmi323_spi_interrupt_streaming.py @@ -0,0 +1,49 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bmi323.bmi323_shuttle import BMI323Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"acc_chip_id=0x{shuttle.sensor.chip_id:04X}") + assert shuttle.sensor.chip_id == 0x0043 + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + if sensor_id == 1: + a_x, a_y, a_z = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x0043 + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + a_x, a_y, a_z, g_x, g_y, g_z, temperature, time_stamp = struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # For using with Application board 3.0, replace the line below with: + # shuttle = BMI088Shuttle.on_hardware_v3_rev0() + shuttle = BMI323Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + res = shuttle.board.read_spi(shuttle.CS, 0, 3) + logger.info(f"{res=}") + + res = shuttle.board.read_spi(shuttle.CS, 0, 3) + logger.info(f"{res=}") + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:04X}") + logger.info(f"err_reg=0x{shuttle.sensor.status:04X}") + + a_x, a_y, a_z = shuttle.sensor.acc_data + logger.info(f"acceleration=({a_x=:04X}, {a_y=:04X}, {a_z=:04X})") + + g_x, g_y, g_z = shuttle.sensor.gyr_data + logger.info(f"angular_velocity=({g_x=:04X}, {g_y=:04X}, {g_z=:04X})") + + logger.info(f"sensor_time={shuttle.sensor.sensor_time:04X}") + + logger.info(f"acc_conf=0x{shuttle.sensor.acc_conf:04X}") + logger.info(f"gyr_conf=0x{shuttle.sensor.gyr_conf:04X}") + # start accelerometer and gyro measurements + shuttle.sensor.acc_conf = 0x4027 + shuttle.sensor.gyr_conf = 0x404B + + logger.info(f"acc_conf=0x{shuttle.sensor.acc_conf:04X}") + logger.info(f"gyr_conf=0x{shuttle.sensor.gyr_conf:04X}") + + logger.info(f"alt_acc_conf=0x{shuttle.sensor.alt_acc_conf:04X}") + logger.info(f"alt_gyr_conf=0x{shuttle.sensor.alt_gyr_conf:04X}") + time.sleep(0.1) + a_x, a_y, a_z = shuttle.sensor.acc_data + logger.info(f"acceleration=({a_x=}, {a_y=}, {a_z=})") + + g_x, g_y, g_z = shuttle.sensor.gyr_data + logger.info(f"angular_velocity=({g_x=}, {g_y=}, {g_z=})") diff --git a/src/umrx_app_v3/sensors/bmi088.py b/src/umrx_app_v3/sensors/bmi088.py index 40afcb3..f8dffba 100644 --- a/src/umrx_app_v3/sensors/bmi088.py +++ b/src/umrx_app_v3/sensors/bmi088.py @@ -1,64 +1,69 @@ import struct from collections.abc import Callable +from enum import Enum + + +class BMI088GyroAddr(Enum): + gyro_chip_id = 0x00 + gyro_rate_x_lsb = 0x02 + gyro_rate_x_msb = 0x03 + gyro_rate_y_lsb = 0x04 + gyro_rate_y_msb = 0x05 + gyro_rate_z_lsb = 0x06 + gyro_rate_z_msb = 0x07 + gyro_int_stat_1 = 0x0A + gyro_fifo_status = 0x0E + gyro_range = 0x0F + gyro_bandwidth = 0x10 + gyro_lpm1 = 0x11 + gyro_soft_reset = 0x14 + gyro_int_ctrl = 0x15 + gyro_int3_int4_io_conf = 0x16 + gyro_int3_int4_io_map = 0x18 + gyro_fifo_wm_en = 0x1E + gyro_fifo_ext_int_s = 0x34 + gyro_self_test = 0x3C + gyro_fifo_config_0 = 0x3D + gyro_fifo_config_1 = 0x3E + gyro_fifo_data = 0x3F + + +class BMI088AccelAddr(Enum): + acc_chip_id = 0x00 + acc_err_reg = 0x02 + acc_status = 0x03 + acc_x_lsb = 0x12 + acc_x_msb = 0x13 + acc_y_lsb = 0x14 + acc_y_msb = 0x15 + acc_z_lsb = 0x16 + acc_z_msb = 0x17 + acc_sensor_time_0 = 0x18 + acc_sensor_time_1 = 0x19 + acc_sensor_time_2 = 0x1A + acc_int_stat_1 = 0x1D + acc_temp_msb = 0x22 + acc_temp_lsb = 0x23 + acc_fifo_length_0 = 0x24 + acc_fifo_length_1 = 0x25 + acc_fifo_data = 0x26 + acc_conf = 0x40 + acc_range = 0x41 + acc_fifo_downs = 0x45 + acc_fifo_wtm_0 = 0x46 + acc_fifo_wtm_1 = 0x47 + acc_fifo_config_0 = 0x48 + acc_fifo_config_1 = 0x49 + acc_int1_io_ctrl = 0x53 + acc_int2_io_ctrl = 0x54 + acc_int_map_data = 0x58 + acc_self_test = 0x6D + acc_pwr_conf = 0x7C + acc_pwr_ctrl = 0x7D + acc_soft_reset = 0x7E class BMI088: - gyro_chip_id_addr = 0x00 - gyro_rate_x_lsb_addr = 0x02 - gyro_rate_x_msb_addr = 0x03 - gyro_rate_y_lsb_addr = 0x04 - gyro_rate_y_msb_addr = 0x05 - gyro_rate_z_lsb_addr = 0x06 - gyro_rate_z_msb_addr = 0x07 - gyro_int_stat_1_addr = 0x0A - gyro_fifo_status_addr = 0x0E - gyro_range_addr = 0x0F - gyro_bandwidth_addr = 0x10 - gyro_lpm1_addr = 0x11 - gyro_soft_reset_addr = 0x14 - gyro_int_ctrl_addr = 0x15 - gyro_int3_int4_io_conf_addr = 0x16 - gyro_int3_int4_io_map_addr = 0x18 - gyro_fifo_wm_en_addr = 0x1E - gyro_fifo_ext_int_s_addr = 0x34 - gyro_self_test_addr = 0x3C - gyro_fifo_config_0_addr = 0x3D - gyro_fifo_config_1_addr = 0x3E - gyro_fifo_data_addr = 0x3F - - acc_chip_id_addr = 0x00 - acc_err_reg_addr = 0x02 - acc_status_addr = 0x03 - acc_x_lsb_addr = 0x12 - acc_x_msb_addr = 0x13 - acc_y_lsb_addr = 0x14 - acc_y_msb_addr = 0x15 - acc_z_lsb_addr = 0x16 - acc_z_msb_addr = 0x17 - acc_sensor_time_0_addr = 0x18 - acc_sensor_time_1_addr = 0x19 - acc_sensor_time_2_addr = 0x1A - acc_int_stat_1_addr = 0x1D - acc_temp_msb_addr = 0x22 - acc_temp_lsb_addr = 0x23 - acc_fifo_length_0_addr = 0x24 - acc_fifo_length_1_addr = 0x25 - acc_fifo_data_addr = 0x26 - acc_conf_addr = 0x40 - acc_range_addr = 0x41 - acc_fifo_downs_addr = 0x45 - acc_fifo_wtm_0_addr = 0x46 - acc_fifo_wtm_1_addr = 0x47 - acc_fifo_config_0_addr = 0x48 - acc_fifo_config_1_addr = 0x49 - acc_int1_io_ctrl_addr = 0x53 - acc_int2_io_ctrl_addr = 0x54 - acc_int_map_data_addr = 0x58 - acc_self_test_addr = 0x6D - acc_pwr_conf_addr = 0x7C - acc_pwr_ctrl_addr = 0x7D - acc_soft_reset_addr = 0x7E - def __init__(self) -> None: self.read_gyro: Callable | None = None self.write_gyro: Callable | None = None @@ -75,269 +80,269 @@ def assign_accel_callbacks(self, read_callback: Callable, write_callback: Callab @property def gyro_chip_id(self) -> int: - return self.read_gyro(BMI088.gyro_chip_id_addr) + return self.read_gyro(BMI088GyroAddr.gyro_chip_id) @property def gyro_rate(self) -> tuple[int, int, int]: - payload = self.read_gyro(BMI088.gyro_rate_x_lsb_addr, 6) + payload = self.read_gyro(BMI088GyroAddr.gyro_rate_x_lsb, 6) g_x, g_y, g_z = struct.unpack(" int: - return self.read_gyro(BMI088.gyro_int_stat_1_addr) + return self.read_gyro(BMI088GyroAddr.gyro_int_stat_1) @property def gyro_fifo_status(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_status_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_status) @property def gyro_range(self) -> int: - return self.read_gyro(BMI088.gyro_range_addr) + return self.read_gyro(BMI088GyroAddr.gyro_range) @gyro_range.setter def gyro_range(self, value: int) -> None: - self.write_gyro(BMI088.gyro_range_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_range, value) @property def gyro_bandwidth(self) -> int: - return self.read_gyro(BMI088.gyro_bandwidth_addr) + return self.read_gyro(BMI088GyroAddr.gyro_bandwidth) @gyro_bandwidth.setter def gyro_bandwidth(self, value: int) -> None: - self.write_gyro(BMI088.gyro_bandwidth_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_bandwidth, value) @property def gyro_lpm1(self) -> int: - return self.read_gyro(BMI088.gyro_lpm1_addr) + return self.read_gyro(BMI088GyroAddr.gyro_lpm1) @gyro_lpm1.setter def gyro_lpm1(self, value: int) -> None: - self.write_gyro(BMI088.gyro_lpm1_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_lpm1, value) def gyro_soft_reset(self, value: int) -> None: - self.write_gyro(BMI088.gyro_soft_reset_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_soft_reset, value) gyro_soft_reset = property(None, gyro_soft_reset) @property def gyro_int_ctrl(self) -> int: - return self.read_gyro(BMI088.gyro_int_ctrl_addr) + return self.read_gyro(BMI088GyroAddr.gyro_int_ctrl) @gyro_int_ctrl.setter def gyro_int_ctrl(self, value: int) -> None: - self.write_gyro(BMI088.gyro_int_ctrl_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_int_ctrl, value) @property def gyro_int3_int4_io_conf(self) -> int: - return self.read_gyro(BMI088.gyro_int3_int4_io_conf_addr) + return self.read_gyro(BMI088GyroAddr.gyro_int3_int4_io_conf) @gyro_int3_int4_io_conf.setter def gyro_int3_int4_io_conf(self, value: int) -> None: - self.write_gyro(BMI088.gyro_int3_int4_io_conf_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_int3_int4_io_conf, value) @property def gyro_int3_int4_io_map(self) -> int: - return self.read_gyro(BMI088.gyro_int3_int4_io_map_addr) + return self.read_gyro(BMI088GyroAddr.gyro_int3_int4_io_map) @gyro_int3_int4_io_map.setter def gyro_int3_int4_io_map(self, value: int) -> None: - self.write_gyro(BMI088.gyro_int3_int4_io_map_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_int3_int4_io_map, value) @property def gyro_fifo_wm_en(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_wm_en_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_wm_en) @gyro_fifo_wm_en.setter def gyro_fifo_wm_en(self, value: int) -> None: - self.write_gyro(BMI088.gyro_fifo_wm_en_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_fifo_wm_en, value) @property def gyro_fifo_ext_int_s(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_ext_int_s_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_ext_int_s) @gyro_fifo_ext_int_s.setter def gyro_fifo_ext_int_s(self, value: int) -> None: - self.write_gyro(BMI088.gyro_fifo_ext_int_s_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_fifo_ext_int_s, value) @property def gyro_self_test(self) -> int: - return self.read_gyro(BMI088.gyro_self_test_addr) + return self.read_gyro(BMI088GyroAddr.gyro_self_test) @gyro_self_test.setter def gyro_self_test(self, value: int) -> None: - self.write_gyro(BMI088.gyro_self_test_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_self_test, value) @property def gyro_fifo_config_0(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_config_0_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_config_0) @gyro_fifo_config_0.setter def gyro_fifo_config_0(self, value: int) -> None: - self.write_gyro(BMI088.gyro_fifo_config_0_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_fifo_config_0, value) @property def gyro_fifo_config_1(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_config_1_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_config_1) @gyro_fifo_config_1.setter def gyro_fifo_config_1(self, value: int) -> None: - self.write_gyro(BMI088.gyro_fifo_config_1_addr, value) + self.write_gyro(BMI088GyroAddr.gyro_fifo_config_1, value) @property def gyro_fifo_data(self) -> int: - return self.read_gyro(BMI088.gyro_fifo_data_addr) + return self.read_gyro(BMI088GyroAddr.gyro_fifo_data) # Accelerometer @property def acc_chip_id(self) -> int: - return self.read_accel(BMI088.acc_chip_id_addr) + return self.read_accel(BMI088AccelAddr.acc_chip_id) @property def acc_err_reg(self) -> int: - return self.read_accel(BMI088.acc_err_reg_addr) + return self.read_accel(BMI088AccelAddr.acc_err_reg) @property def acc_status(self) -> int: - return self.read_accel(BMI088.acc_status_addr) + return self.read_accel(BMI088AccelAddr.acc_status) @property def acceleration(self) -> tuple[int, int, int]: - payload = self.read_accel(BMI088.acc_x_lsb_addr, 6) + payload = self.read_accel(BMI088AccelAddr.acc_x_lsb, 6) a_x, a_y, a_z = struct.unpack(" int: - byte_0, byte_1, byte_2 = self.read_accel(BMI088.acc_sensor_time_0_addr, 3) + byte_0, byte_1, byte_2 = self.read_accel(BMI088AccelAddr.acc_sensor_time_0, 3) return (byte_2 << 16) | (byte_1 << 8) | byte_0 @property def acc_int_stat_1(self) -> int: - return self.read_accel(BMI088.acc_int_stat_1_addr) + return self.read_accel(BMI088AccelAddr.acc_int_stat_1) @property def acc_temperature(self) -> int: - msb, lsb = self.read_accel(BMI088.acc_temp_msb_addr, 2) + msb, lsb = self.read_accel(BMI088AccelAddr.acc_temp_msb, 2) return (msb << 3) | (lsb >> 5) @property def acc_fifo_length_0(self) -> int: - return self.read_accel(BMI088.acc_fifo_length_0_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_length_0) @property def acc_fifo_length_1(self) -> int: - return self.read_accel(BMI088.acc_fifo_length_1_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_length_1) @property def acc_fifo_data(self) -> int: - return self.read_accel(BMI088.acc_fifo_data_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_data) @property def acc_conf(self) -> int: - return self.read_accel(BMI088.acc_conf_addr) + return self.read_accel(BMI088AccelAddr.acc_conf) @acc_conf.setter def acc_conf(self, value: int) -> None: - self.write_accel(BMI088.acc_conf_addr, value) + self.write_accel(BMI088AccelAddr.acc_conf, value) @property def acc_range(self) -> int: - return self.read_accel(BMI088.acc_range_addr) + return self.read_accel(BMI088AccelAddr.acc_range) @acc_range.setter def acc_range(self, value: int) -> None: - self.write_accel(BMI088.acc_range_addr, value) + self.write_accel(BMI088AccelAddr.acc_range, value) @property def acc_fifo_downs(self) -> int: - return self.read_accel(BMI088.acc_fifo_downs_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_downs) @acc_fifo_downs.setter def acc_fifo_downs(self, value: int) -> None: - self.write_accel(BMI088.acc_fifo_downs_addr, value) + self.write_accel(BMI088AccelAddr.acc_fifo_downs, value) @property def acc_fifo_wtm_0(self) -> int: - return self.read_accel(BMI088.acc_fifo_wtm_0_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_wtm_0) @acc_fifo_wtm_0.setter def acc_fifo_wtm_0(self, value: int) -> None: - self.write_accel(BMI088.acc_fifo_wtm_0_addr, value) + self.write_accel(BMI088AccelAddr.acc_fifo_wtm_0, value) @property def acc_fifo_wtm_1(self) -> int: - return self.read_accel(BMI088.acc_fifo_wtm_1_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_wtm_1) @acc_fifo_wtm_1.setter def acc_fifo_wtm_1(self, value: int) -> None: - self.write_accel(BMI088.acc_fifo_wtm_1_addr, value) + self.write_accel(BMI088AccelAddr.acc_fifo_wtm_1, value) @property def acc_fifo_config_0(self) -> int: - return self.read_accel(BMI088.acc_fifo_config_0_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_config_0) @acc_fifo_config_0.setter def acc_fifo_config_0(self, value: int) -> None: - self.write_accel(BMI088.acc_fifo_config_0_addr, value) + self.write_accel(BMI088AccelAddr.acc_fifo_config_0, value) @property def acc_fifo_config_1(self) -> int: - return self.read_accel(BMI088.acc_fifo_config_1_addr) + return self.read_accel(BMI088AccelAddr.acc_fifo_config_1) @acc_fifo_config_1.setter def acc_fifo_config_1(self, value: int) -> None: - self.write_accel(BMI088.acc_fifo_config_1_addr, value) + self.write_accel(BMI088AccelAddr.acc_fifo_config_1, value) @property def acc_int1_io_ctrl(self) -> int: - return self.read_accel(BMI088.acc_int1_io_ctrl_addr) + return self.read_accel(BMI088AccelAddr.acc_int1_io_ctrl) @acc_int1_io_ctrl.setter def acc_int1_io_ctrl(self, value: int) -> None: - self.write_accel(BMI088.acc_int1_io_ctrl_addr, value) + self.write_accel(BMI088AccelAddr.acc_int1_io_ctrl, value) @property def acc_int2_io_ctrl(self) -> int: - return self.read_accel(BMI088.acc_int2_io_ctrl_addr) + return self.read_accel(BMI088AccelAddr.acc_int2_io_ctrl) @acc_int2_io_ctrl.setter def acc_int2_io_ctrl(self, value: int) -> None: - self.write_accel(BMI088.acc_int2_io_ctrl_addr, value) + self.write_accel(BMI088AccelAddr.acc_int2_io_ctrl, value) @property def acc_int_map_data(self) -> int: - return self.read_accel(BMI088.acc_int_map_data_addr) + return self.read_accel(BMI088AccelAddr.acc_int_map_data) @acc_int_map_data.setter def acc_int_map_data(self, value: int) -> None: - self.write_accel(BMI088.acc_int_map_data_addr, value) + self.write_accel(BMI088AccelAddr.acc_int_map_data, value) @property def acc_self_test(self) -> int: - return self.read_accel(BMI088.acc_self_test_addr) + return self.read_accel(BMI088AccelAddr.acc_self_test) @acc_self_test.setter def acc_self_test(self, value: int) -> None: - self.write_accel(BMI088.acc_self_test_addr, value) + self.write_accel(BMI088AccelAddr.acc_self_test, value) @property def acc_pwr_conf(self) -> int: - return self.read_accel(BMI088.acc_pwr_conf_addr) + return self.read_accel(BMI088AccelAddr.acc_pwr_conf) @acc_pwr_conf.setter def acc_pwr_conf(self, value: int) -> None: - self.write_accel(BMI088.acc_pwr_conf_addr, value) + self.write_accel(BMI088AccelAddr.acc_pwr_conf, value) @property def acc_pwr_ctrl(self) -> int: - return self.read_accel(BMI088.acc_pwr_ctrl_addr) + return self.read_accel(BMI088AccelAddr.acc_pwr_ctrl) @acc_pwr_ctrl.setter def acc_pwr_ctrl(self, value: int) -> None: - self.write_accel(BMI088.acc_pwr_ctrl_addr, value) + self.write_accel(BMI088AccelAddr.acc_pwr_ctrl, value) def acc_soft_reset(self, value: int) -> None: - self.write_accel(BMI088.acc_soft_reset_addr, value) + self.write_accel(BMI088AccelAddr.acc_soft_reset, value) acc_soft_reset = property(None, acc_soft_reset) diff --git a/src/umrx_app_v3/sensors/bmi323.py b/src/umrx_app_v3/sensors/bmi323.py new file mode 100644 index 0000000..1af4378 --- /dev/null +++ b/src/umrx_app_v3/sensors/bmi323.py @@ -0,0 +1,480 @@ +import struct +from collections.abc import Callable +from enum import Enum + + +class BMI323Addr(Enum): + chip_id = 0x00 + err_reg = 0x01 + status = 0x02 + acc_data_x = 0x03 + acc_data_y = 0x04 + acc_data_z = 0x05 + gyr_data_x = 0x06 + gyr_data_y = 0x07 + gyr_data_z = 0x08 + temp_data = 0x09 + sensor_time_0 = 0x0A + sensor_time_1 = 0x0B + sat_flags = 0x0C + int_status_int1 = 0x0D + int_status_int2 = 0x0E + int_status_ibi = 0x0F + feature_io0 = 0x10 + feature_io1 = 0x11 + feature_io2 = 0x12 + feature_io3 = 0x13 + feature_io_status = 0x14 + fifo_fill_level = 0x15 + fifo_data = 0x16 + acc_conf = 0x20 + gyr_conf = 0x21 + alt_acc_conf = 0x28 + alt_gyr_conf = 0x29 + alt_conf = 0x2A + alt_status = 0x2B + fifo_watermark = 0x35 + fifo_conf = 0x36 + fifo_ctrl = 0x37 + io_int_ctrl = 0x38 + int_conf = 0x39 + int_map1 = 0x3A + int_map2 = 0x3B + feature_ctrl = 0x40 + feature_data_addr = 0x41 + feature_data_tx = 0x42 + feature_data_status = 0x43 + feature_engine_status = 0x45 + feature_event_ext = 0x47 + io_pdn_ctrl = 0x4F + io_spi_if = 0x50 + io_pad_strength = 0x51 + io_i2c_if = 0x52 + io_odr_derivation = 0x53 + acc_dp_off_x = 0x60 + acc_dp_dgain_x = 0x61 + acc_dp_off_y = 0x62 + acc_dp_dgain_y = 0x63 + acc_dp_off_z = 0x64 + acc_dp_dgain_z = 0x65 + gyr_dp_off_x = 0x66 + gyr_dp_dgain_x = 0x67 + gyr_dp_off_y = 0x68 + gyr_dp_dgain_y = 0x69 + gyr_dp_off_z = 0x6A + gyr_dp_dgain_z = 0x6B + i3c_tc_sync_tph = 0x70 + i3c_tc_sync_tu = 0x71 + i3c_tc_sync_odr = 0x72 + cmd = 0x7E + cfg_res = 0x7F + + +class BMI323: + def __init__(self) -> None: + self.read: Callable | None = None + self.write: Callable | None = None + + def assign_callbacks(self, read_callback: Callable, write_callback: Callable) -> None: + self.read = read_callback + self.write = write_callback + + @property + def chip_id(self) -> int: + return self.read(BMI323Addr.chip_id) & 0xFF + + @property + def err_reg(self) -> int: + return self.read(BMI323Addr.err_reg) + + @err_reg.setter + def err_reg(self, value: int) -> None: + self.write(BMI323Addr.err_reg, value) + + @property + def status(self) -> int: + return self.read(BMI323Addr.status) + + @status.setter + def status(self, value: int) -> None: + self.write(BMI323Addr.status, value) + + @property + def acc_data(self) -> tuple[int, int, int]: + payload = self.read(BMI323Addr.acc_data_x, 6) + a_x, a_y, a_z = struct.unpack(" tuple[int, int, int]: + payload = self.read(BMI323Addr.gyr_data_x, 6) + g_x, g_y, g_z = struct.unpack(" int: + return self.read(BMI323Addr.temp_data) + + @property + def sensor_time(self) -> int: + sensor_time_0 = self.read(BMI323Addr.sensor_time_0) + sensor_time_1 = self.read(BMI323Addr.sensor_time_1) + return (sensor_time_1 << 16) | sensor_time_0 + + @property + def sat_flags(self) -> int: + return self.read(BMI323Addr.sat_flags) + + @property + def int_status_int1(self) -> int: + return self.read(BMI323Addr.int_status_int1) + + @property + def int_status_int2(self) -> int: + return self.read(BMI323Addr.int_status_int2) + + @property + def int_status_ibi(self) -> int: + return self.read(BMI323Addr.int_status_ibi) + + @property + def feature_io0(self) -> int: + return self.read(BMI323Addr.feature_io0) + + @feature_io0.setter + def feature_io0(self, value: int) -> None: + self.write(BMI323Addr.feature_io0, value) + + @property + def feature_io1(self) -> int: + return self.read(BMI323Addr.feature_io1) + + @property + def feature_io2(self) -> int: + return self.read(BMI323Addr.feature_io2) + + @property + def feature_io3(self) -> int: + return self.read(BMI323Addr.feature_io3) + + @property + def feature_io_status(self) -> int: + return self.read(BMI323Addr.feature_io_status) + + @feature_io_status.setter + def feature_io_status(self, value: int) -> None: + self.write(BMI323Addr.feature_io_status, value) + + @property + def fifo_fill_level(self) -> int: + return self.read(BMI323Addr.fifo_fill_level) + + @property + def fifo_data(self) -> int: + return self.read(BMI323Addr.fifo_data) + + @fifo_data.setter + def fifo_data(self, value: int) -> None: + self.write(BMI323Addr.fifo_data, value) + + @property + def acc_conf(self) -> int: + return self.read(BMI323Addr.acc_conf) + + @acc_conf.setter + def acc_conf(self, value: int) -> None: + self.write(BMI323Addr.acc_conf, value) + + @property + def gyr_conf(self) -> int: + return self.read(BMI323Addr.gyr_conf) + + @gyr_conf.setter + def gyr_conf(self, value: int) -> None: + self.write(BMI323Addr.gyr_conf, value) + + @property + def alt_acc_conf(self) -> int: + return self.read(BMI323Addr.alt_acc_conf) + + @alt_acc_conf.setter + def alt_acc_conf(self, value: int) -> None: + self.write(BMI323Addr.alt_acc_conf, value) + + @property + def alt_gyr_conf(self) -> int: + return self.read(BMI323Addr.alt_gyr_conf) + + @alt_gyr_conf.setter + def alt_gyr_conf(self, value: int) -> None: + self.write(BMI323Addr.alt_gyr_conf, value) + + @property + def alt_conf(self) -> int: + return self.read(BMI323Addr.alt_conf) + + @alt_conf.setter + def alt_conf(self, value: int) -> None: + self.write(BMI323Addr.alt_conf, value) + + @property + def alt_status(self) -> int: + return self.read(BMI323Addr.alt_status) + + @property + def fifo_watermark(self) -> int: + return self.read(BMI323Addr.fifo_watermark) + + @fifo_watermark.setter + def fifo_watermark(self, value: int) -> None: + self.write(BMI323Addr.fifo_watermark, value) + + @property + def fifo_conf(self) -> int: + return self.read(BMI323Addr.fifo_conf) + + @fifo_conf.setter + def fifo_conf(self, value: int) -> None: + self.write(BMI323Addr.fifo_conf, value) + + def fifo_ctrl(self, value: int) -> None: + self.write(BMI323Addr.fifo_ctrl, value) + + fifo_ctrl = property(None, fifo_ctrl) + + @property + def io_int_ctrl(self) -> int: + return self.read(BMI323Addr.io_int_ctrl) + + @io_int_ctrl.setter + def io_int_ctrl(self, value: int) -> None: + self.write(BMI323Addr.io_int_ctrl, value) + + @property + def int_conf(self) -> int: + return self.read(BMI323Addr.int_conf) + + @int_conf.setter + def int_conf(self, value: int) -> None: + self.write(BMI323Addr.int_conf, value) + + @property + def int_map1(self) -> int: + return self.read(BMI323Addr.int_map1) + + @int_map1.setter + def int_map1(self, value: int) -> None: + self.write(BMI323Addr.int_map1, value) + + @property + def int_map2(self) -> int: + return self.read(BMI323Addr.int_map2) + + @int_map2.setter + def int_map2(self, value: int) -> None: + self.write(BMI323Addr.int_map2, value) + + @property + def feature_ctrl(self) -> int: + return self.read(BMI323Addr.feature_ctrl) + + @feature_ctrl.setter + def feature_ctrl(self, value: int) -> None: + self.write(BMI323Addr.feature_ctrl, value) + + @property + def feature_data_addr(self) -> int: + return self.read(BMI323Addr.feature_data_addr) + + @feature_data_addr.setter + def feature_data_addr(self, value: int) -> None: + self.write(BMI323Addr.feature_data_addr, value) + + @property + def feature_data_tx(self) -> int: + return self.read(BMI323Addr.feature_data_tx) + + @feature_data_tx.setter + def feature_data_tx(self, value: int) -> None: + self.write(BMI323Addr.feature_data_tx, value) + + @property + def feature_data_status(self) -> int: + return self.read(BMI323Addr.feature_data_status) + + @property + def feature_engine_status(self) -> int: + return self.read(BMI323Addr.feature_engine_status) + + @property + def feature_event_ext(self) -> int: + return self.read(BMI323Addr.feature_event_ext) + + @property + def io_pdn_ctrl(self) -> int: + return self.read(BMI323Addr.io_pdn_ctrl) + + @io_pdn_ctrl.setter + def io_pdn_ctrl(self, value: int) -> None: + self.write(BMI323Addr.io_pdn_ctrl, value) + + @property + def io_spi_if(self) -> int: + return self.read(BMI323Addr.io_spi_if) + + @io_spi_if.setter + def io_spi_if(self, value: int) -> None: + self.write(BMI323Addr.io_spi_if, value) + + @property + def io_pad_strength(self) -> int: + return self.read(BMI323Addr.io_pad_strength) + + @io_pad_strength.setter + def io_pad_strength(self, value: int) -> None: + self.write(BMI323Addr.io_pad_strength, value) + + @property + def io_i2c_if(self) -> int: + return self.read(BMI323Addr.io_i2c_if) + + @io_i2c_if.setter + def io_i2c_if(self, value: int) -> None: + self.write(BMI323Addr.io_i2c_if, value) + + @property + def io_odr_derivation(self) -> int: + return self.read(BMI323Addr.io_odr_derivation) + + @property + def acc_dp_off_x(self) -> int: + return self.read(BMI323Addr.acc_dp_off_x) + + @acc_dp_off_x.setter + def acc_dp_off_x(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_off_x, value) + + @property + def acc_dp_dgain_x(self) -> int: + return self.read(BMI323Addr.acc_dp_dgain_x) + + @acc_dp_dgain_x.setter + def acc_dp_dgain_x(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_dgain_x, value) + + @property + def acc_dp_off_y(self) -> int: + return self.read(BMI323Addr.acc_dp_off_y) + + @acc_dp_off_y.setter + def acc_dp_off_y(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_off_y, value) + + @property + def acc_dp_dgain_y(self) -> int: + return self.read(BMI323Addr.acc_dp_dgain_y) + + @acc_dp_dgain_y.setter + def acc_dp_dgain_y(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_dgain_y, value) + + @property + def acc_dp_off_z(self) -> int: + return self.read(BMI323Addr.acc_dp_off_z) + + @acc_dp_off_z.setter + def acc_dp_off_z(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_off_z, value) + + @property + def acc_dp_dgain_z(self) -> int: + return self.read(BMI323Addr.acc_dp_dgain_z) + + @acc_dp_dgain_z.setter + def acc_dp_dgain_z(self, value: int) -> None: + self.write(BMI323Addr.acc_dp_dgain_z, value) + + @property + def gyr_dp_off_x(self) -> int: + return self.read(BMI323Addr.gyr_dp_off_x) + + @gyr_dp_off_x.setter + def gyr_dp_off_x(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_off_x, value) + + @property + def gyr_dp_dgain_x(self) -> int: + return self.read(BMI323Addr.gyr_dp_dgain_x) + + @gyr_dp_dgain_x.setter + def gyr_dp_dgain_x(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_dgain_x, value) + + @property + def gyr_dp_off_y(self) -> int: + return self.read(BMI323Addr.gyr_dp_off_y) + + @gyr_dp_off_y.setter + def gyr_dp_off_y(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_off_y, value) + + @property + def gyr_dp_dgain_y(self) -> int: + return self.read(BMI323Addr.gyr_dp_dgain_y) + + @gyr_dp_dgain_y.setter + def gyr_dp_dgain_y(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_dgain_y, value) + + @property + def gyr_dp_off_z(self) -> int: + return self.read(BMI323Addr.gyr_dp_off_z) + + @gyr_dp_off_z.setter + def gyr_dp_off_z(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_off_z, value) + + @property + def gyr_dp_dgain_z(self) -> int: + return self.read(BMI323Addr.gyr_dp_dgain_z) + + @gyr_dp_dgain_z.setter + def gyr_dp_dgain_z(self, value: int) -> None: + self.write(BMI323Addr.gyr_dp_dgain_z, value) + + @property + def i3c_tc_sync_tph(self) -> int: + return self.read(BMI323Addr.i3c_tc_sync_tph) + + @i3c_tc_sync_tph.setter + def i3c_tc_sync_tph(self, value: int) -> None: + self.write(BMI323Addr.i3c_tc_sync_tph, value) + + @property + def i3c_tc_sync_tu(self) -> int: + return self.read(BMI323Addr.i3c_tc_sync_tu) + + @i3c_tc_sync_tu.setter + def i3c_tc_sync_tu(self, value: int) -> None: + self.write(BMI323Addr.i3c_tc_sync_tu, value) + + @property + def i3c_tc_sync_odr(self) -> int: + return self.read(BMI323Addr.i3c_tc_sync_odr) + + @i3c_tc_sync_odr.setter + def i3c_tc_sync_odr(self, value: int) -> None: + self.write(BMI323Addr.i3c_tc_sync_odr, value) + + def cmd(self, value: int) -> None: + self.write(BMI323Addr.cmd, value) + + cmd = property(None, cmd) + + @property + def cfg_res(self) -> int: + return self.read(BMI323Addr.cfg_res) + + @cfg_res.setter + def cfg_res(self, value: int) -> None: + self.write(BMI323Addr.cfg_res, value) diff --git a/src/umrx_app_v3/shuttle_board/bmi088/bmi088_shuttle.py b/src/umrx_app_v3/shuttle_board/bmi088/bmi088_shuttle.py index dc1d57c..3ddf006 100644 --- a/src/umrx_app_v3/shuttle_board/bmi088/bmi088_shuttle.py +++ b/src/umrx_app_v3/shuttle_board/bmi088/bmi088_shuttle.py @@ -15,7 +15,7 @@ StreamingSamplingUnit, ) from umrx_app_v3.mcu_board.commands.spi import SPIConfigureCmd -from umrx_app_v3.sensors.bmi088 import BMI088 +from umrx_app_v3.sensors.bmi088 import BMI088, BMI088AccelAddr, BMI088GyroAddr from umrx_app_v3.shuttle_board.bmi088.accel_streaming_packet import BMI088AccelPacket from umrx_app_v3.shuttle_board.bmi088.gyro_streaming_packet import BMI088GyroPacket @@ -102,6 +102,8 @@ def configure_spi(self) -> None: self.is_i2c_configured = False def read_accel_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[int] | int: + if isinstance(reg_addr, BMI088AccelAddr): + reg_addr = reg_addr.value if self.is_i2c_configured: values = self.board.read_i2c(self.ACCEL_I2C_DEFAULT_ADDRESS, reg_addr, bytes_to_read) if bytes_to_read == 1: @@ -118,6 +120,8 @@ def read_accel_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[in raise BMI088ShuttleError(error_message) def write_accel_register(self, reg_addr: int, value: int) -> None: + if isinstance(reg_addr, BMI088AccelAddr): + reg_addr = reg_addr.value if self.is_i2c_configured: return self.board.write_i2c(self.ACCEL_I2C_DEFAULT_ADDRESS, reg_addr, array("B", (value,))) if self.is_spi_configured: @@ -126,6 +130,8 @@ def write_accel_register(self, reg_addr: int, value: int) -> None: raise BMI088ShuttleError(error_message) def read_gyro_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[int] | int: + if isinstance(reg_addr, BMI088GyroAddr): + reg_addr = reg_addr.value if self.is_i2c_configured: values = self.board.read_i2c(self.GYRO_I2C_DEFAULT_ADDRESS, reg_addr, bytes_to_read) if bytes_to_read == 1: @@ -140,6 +146,8 @@ def read_gyro_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[int raise BMI088ShuttleError(error_message) def write_gyro_register(self, reg_addr: int, value: int) -> None: + if isinstance(reg_addr, BMI088GyroAddr): + reg_addr = reg_addr.value if self.is_i2c_configured: return self.board.write_i2c(self.GYRO_I2C_DEFAULT_ADDRESS, reg_addr, array("B", (value,))) if self.is_spi_configured: @@ -279,7 +287,7 @@ def start_streaming(self) -> None: def stop_streaming(self) -> None: self.board.stop_polling_streaming() time.sleep(0.15) - self.board.start_interrupt_streaming() + self.board.stop_interrupt_streaming() def decode_gyro_streaming(self, payload: array[int]) -> BMI088GyroPacket: g_x, g_y, g_z = struct.unpack(" None: + self.board: ApplicationBoard | None = kw["board"] if kw.get("board") else None + self.sensor: BMI323 = BMI323() + self.is_initialized: bool = False + self.is_i2c_configured: bool = False + self.is_spi_configured: bool = False + self.is_polling_streaming_configured: bool = False + self.is_interrupt_streaming_configured: bool = False + + def attach_to(self, board: ApplicationBoard) -> None: + self.board = board + + @classmethod + def on_hardware_v3_rev0(cls) -> Self: + return cls(board=ApplicationBoardV3Rev0()) + + @classmethod + def on_hardware_v3_rev1(cls) -> Self: + return cls(board=ApplicationBoardV3Rev1()) + + def initialize(self) -> None: + self.board.initialize() + self.board.start_communication() + self.is_initialized = True + + def check_connected_hw(self) -> None: + board_info = self.board.board_info + if board_info.shuttle_id != self.SHUTTLE_ID: + error_message = f"Expect shuttle_id={self.SHUTTLE_ID} got {board_info.shuttle_id}" + raise BMI323ShuttleError(error_message) + + def assign_sensor_callbacks(self) -> None: + self.sensor.assign_callbacks(read_callback=self.read_register, write_callback=self.write_register) + + def configure_i2c(self) -> None: + self.board.set_pin_config(self.SDO, PinDirection.OUTPUT, PinValue.LOW) + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.01) + self.board.configure_i2c(I2CMode.FAST_MODE) + self.assign_sensor_callbacks() + self.is_i2c_configured = True + self.is_spi_configured = False + + def configure_spi(self) -> None: + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.2) + if isinstance(self.board, ApplicationBoardV3Rev1): + SPIConfigureCmd.set_bus(SPIBus.BUS_1) + self.board.configure_spi() + self.assign_sensor_callbacks() + self.is_spi_configured = True + self.is_i2c_configured = False + + def read_register(self, reg_addr: int, bytes_to_read: int = -1) -> array[int] | int: + if isinstance(reg_addr, BMI323Addr): + reg_addr = reg_addr.value + if self.is_i2c_configured: + if bytes_to_read == -1: + return self.read_single_register_i2c(reg_addr) + return self.read_multiple_i2c(reg_addr, bytes_to_read) + if self.is_spi_configured: + if bytes_to_read == -1: + return self.read_single_register_spi(reg_addr) + return self.read_multiple_spi(reg_addr, bytes_to_read) + + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMI323ShuttleError(error_message) + + def read_single_register_i2c(self, reg_addr: int) -> int: + values = self.board.read_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, bytes_to_read=4) + lsb, msb = values[2:] + return (msb << 8) | lsb + + def read_multiple_i2c(self, start_register_addr: int, bytes_to_read: int) -> array[int]: + values = self.board.read_i2c(self.I2C_DEFAULT_ADDRESS, start_register_addr, bytes_to_read + 2) + return values[2:] + + def read_single_register_spi(self, reg_addr: int) -> int: + values = self.board.read_spi(self.CS, reg_addr, 3) + lsb, msb = values[1:] + return (msb << 8) | lsb + + def read_multiple_spi(self, start_register_addr: int, bytes_to_read: int) -> array[int]: + payload = self.board.read_spi(self.CS, start_register_addr, bytes_to_read + 1) + return payload[1:] + + def write_register(self, reg_addr: int, value: int) -> None: + if isinstance(reg_addr, BMI323Addr): + reg_addr = reg_addr.value + lsb, msb = value & 0xFF, (value >> 8) & 0xFF + if self.is_i2c_configured: + return self.board.write_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, array("B", (lsb, msb))) + if self.is_spi_configured: + return self.board.write_spi(self.CS, reg_addr, array("B", (lsb, msb))) + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMI323ShuttleError(error_message) + + def _configure_i2c_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_i2c_channel( + i2c_address=self.I2C_DEFAULT_ADDRESS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMI323Addr.acc_data_x.value, + bytes_to_read=(2 + 6 + 6 + 2 + 4), + ) + self.board.configure_streaming_polling(interface="i2c") + self.is_polling_streaming_configured = True + + def _configure_spi_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_spi_channel( + cs_pin=self.CS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMI323Addr.acc_data_x.value, + bytes_to_read=(1 + 6 + 6 + 2 + 4), + ) + self.board.configure_streaming_polling(interface="spi") + self.is_polling_streaming_configured = True + + def switch_on_accel_and_gyro(self) -> None: + self.sensor.acc_conf = 0x4027 + self.sensor.gyr_conf = 0x404B + + def configure_polling_streaming( + self, + sampling_time: int = 500, + sampling_unit: StreamingSamplingUnit = StreamingSamplingUnit.MICRO_SECOND, + ) -> None: + self.switch_on_accel_and_gyro() + if self.is_i2c_configured: + return self._configure_i2c_polling_streaming(sampling_time, sampling_unit) + if self.is_spi_configured: + return self._configure_spi_polling_streaming(sampling_time, sampling_unit) + error_message = "Configure I2C or SPI protocol first" + raise BMI323ShuttleError(error_message) + + def _configure_i2c_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_i2c_channel( + interrupt_pin=self.INT1, + i2c_address=BMI323Shuttle.I2C_DEFAULT_ADDRESS, + register_address=BMI323Addr.acc_data_x.value, + bytes_to_read=(2 + 6), + ) + self.board.streaming_interrupt_set_i2c_channel( + interrupt_pin=self.INT2, + i2c_address=BMI323Shuttle.I2C_DEFAULT_ADDRESS, + register_address=BMI323Addr.gyr_data_x.value, + bytes_to_read=(2 + 6), + ) + self.board.configure_streaming_interrupt(interface="i2c") + self.is_interrupt_streaming_configured = True + + def _configure_spi_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_spi_channel( + interrupt_pin=self.INT1, + cs_pin=self.CS, + register_address=BMI323Addr.acc_data_x.value, + bytes_to_read=(1 + 6), + ) + self.board.streaming_interrupt_set_spi_channel( + interrupt_pin=self.INT2, + cs_pin=self.CS, + register_address=BMI323Addr.gyr_data_x.value, + bytes_to_read=(1 + 6), + ) + self.board.configure_streaming_interrupt(interface="spi") + self.is_interrupt_streaming_configured = True + + def configure_interrupt_streaming(self) -> None: + self.switch_on_accel_and_gyro() + self.sensor.int_map2 = (0b01 << 10) | (0b10 << 8) + self.sensor.int_conf = 0x0000 + self.sensor.io_int_ctrl = (0b101 << 8) | 0b101 + time.sleep(0.02) + if self.is_i2c_configured: + return self._configure_i2c_interrupt_streaming() + if self.is_spi_configured: + return self._configure_spi_interrupt_streaming() + error_message = "Configure I2C or SPI protocol first" + raise BMI323ShuttleError(error_message) + + def start_streaming(self) -> None: + if self.is_polling_streaming_configured: + return self.board.start_polling_streaming() + if self.is_interrupt_streaming_configured: + return self.board.start_interrupt_streaming() + error_message = "Configure polling or interrupt streaming before streaming start" + raise BMI323ShuttleError(error_message) + + def stop_streaming(self) -> None: + self.board.stop_polling_streaming() + time.sleep(0.15) + self.board.stop_interrupt_streaming() diff --git a/tests/conftest.py b/tests/conftest.py index a385fd8..4381f5b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,7 +20,10 @@ StreamingPollingCmd, ) from umrx_app_v3.mcu_board.commands.timer import TimerCmd +from umrx_app_v3.sensors.bmi088 import BMI088 +from umrx_app_v3.sensors.bmi323 import BMI323 from umrx_app_v3.shuttle_board.bmi088.bmi088_shuttle import BMI088Shuttle +from umrx_app_v3.shuttle_board.bmi323.bmi323_shuttle import BMI323Shuttle handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter("[%(asctime)s][%(levelname)-8s][%(name)s]: %(message)s")) @@ -145,3 +148,18 @@ def app_board_v3_rev1(bst_protocol_serial: BstProtocol) -> ApplicationBoardV3Rev @pytest.fixture(scope="session", autouse=True) def bmi088_shuttle(app_board_v3_rev0: ApplicationBoardV3Rev0) -> BMI088Shuttle: return BMI088Shuttle(board=app_board_v3_rev0) + + +@pytest.fixture(scope="session", autouse=True) +def bmi088() -> BMI088: + return BMI088() + + +@pytest.fixture(scope="session", autouse=True) +def bmi323_shuttle(app_board_v3_rev1: ApplicationBoardV3Rev1) -> BMI323Shuttle: + return BMI323Shuttle(board=app_board_v3_rev1) + + +@pytest.fixture(scope="session", autouse=True) +def bmi323() -> BMI323: + return BMI323() diff --git a/tests/sensors/__init__.py b/tests/sensors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/sensors/test_bmi088.py b/tests/sensors/test_bmi088.py new file mode 100644 index 0000000..ba3d783 --- /dev/null +++ b/tests/sensors/test_bmi088.py @@ -0,0 +1,48 @@ +import struct +from unittest.mock import patch + +from umrx_app_v3.sensors.bmi088 import BMI088 + + +def test_bmi088_read_properties(bmi088: BMI088) -> None: + all_properties = {key: value for key, value in bmi088.__class__.__dict__.items() if isinstance(value, property)} + write_only_properties = [ + key for key, value in all_properties.items() if (value.fset is not None) and (value.fget is None) + ] + readable_properties = all_properties.keys() - write_only_properties + + for readable_property in readable_properties: + with ( + patch.object( + bmi088, + "read_gyro", + side_effect=lambda *vals: tuple(range(vals[1])) if (isinstance(vals, tuple) and len(vals) > 1) else 1, + ) as mocked_gyro_read, + patch.object( + bmi088, + "read_accel", + side_effect=lambda *vals: tuple(range(vals[1])) if (isinstance(vals, tuple) and len(vals) > 1) else 1, + ) as mocked_acc_read, + patch.object(struct, "unpack", return_value=(1, 2, 3)), + ): + getattr(bmi088, readable_property) + if readable_property.startswith("gyro_"): + mocked_gyro_read.assert_called_once() + elif readable_property.startswith("acc_"): + mocked_acc_read.assert_called_once() + + +def test_bmi088_write_properties(bmi088: BMI088) -> None: + all_properties = {key: value for key, value in bmi088.__class__.__dict__.items() if isinstance(value, property)} + writable_properties = [key for key, value in all_properties.items() if value.fset is not None] + + for writable_property in writable_properties: + with ( + patch.object(bmi088, "write_gyro") as mocked_gyro_write, + patch.object(bmi088, "write_accel") as mocked_accel_write, + ): + setattr(bmi088, writable_property, 123) + if writable_property.startswith("gyro_"): + mocked_gyro_write.assert_called_once() + elif writable_property.startswith("acc_"): + mocked_accel_write.assert_called_once() diff --git a/tests/sensors/test_bmi323.py b/tests/sensors/test_bmi323.py new file mode 100644 index 0000000..d085c46 --- /dev/null +++ b/tests/sensors/test_bmi323.py @@ -0,0 +1,30 @@ +import struct +from unittest.mock import patch + +from umrx_app_v3.sensors.bmi323 import BMI323 + + +def test_bmi323_read_properties(bmi323: BMI323) -> None: + all_properties = {key: value for key, value in bmi323.__class__.__dict__.items() if isinstance(value, property)} + write_only_properties = [ + key for key, value in all_properties.items() if (value.fset is not None) and (value.fget is None) + ] + readable_properties = all_properties.keys() - write_only_properties + + for readable_property in readable_properties: + with patch.object(bmi323, "read") as mocked_read, patch.object(struct, "unpack", return_value=(1, 2, 3)): + getattr(bmi323, readable_property) + if readable_property == "sensor_time": + assert mocked_read.call_count == 2 + else: + mocked_read.assert_called_once() + + +def test_bmi323_write_properties(bmi323: BMI323) -> None: + all_properties = {key: value for key, value in bmi323.__class__.__dict__.items() if isinstance(value, property)} + writable_properties = [key for key, value in all_properties.items() if value.fset is not None] + + for writable_property in writable_properties: + with patch.object(bmi323, "write") as mocked_write: + setattr(bmi323, writable_property, 123) + mocked_write.assert_called_once()