From cea74095c1a225306ded9e46ed2900b51d36ab78 Mon Sep 17 00:00:00 2001 From: Adrian Immer <62163284+lebuni@users.noreply.github.com> Date: Sun, 10 Nov 2024 19:15:21 +0100 Subject: [PATCH] Improve speed of TRCReader Rewrote some lines in the TRCReader to optimize for speed, mainly for TRC files v2.x. According to cProfile it's double as fast now. --- can/io/trc.py | 83 +++++++++++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/can/io/trc.py b/can/io/trc.py index f0595c23e..2dbe3763c 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -11,15 +11,12 @@ import os from datetime import datetime, timedelta, timezone from enum import Enum -from typing import Any, Callable, Dict, Generator, List, Optional, TextIO, Union +from typing import Any, Callable, Dict, Generator, Optional, TextIO, Tuple, Union from ..message import Message from ..typechecking import StringPathLike -from ..util import channel2int, dlc2len, len2dlc -from .generic import ( - TextIOMessageReader, - TextIOMessageWriter, -) +from ..util import channel2int, len2dlc +from .generic import TextIOMessageReader, TextIOMessageWriter logger = logging.getLogger("can.io.trc") @@ -58,13 +55,22 @@ def __init__( """ super().__init__(file, mode="r") self.file_version = TRCFileVersion.UNKNOWN - self.start_time: Optional[datetime] = None + self._start_time: float = 0 self.columns: Dict[str, int] = {} + self._num_columns = -1 if not self.file: raise ValueError("The given file cannot be None") - self._parse_cols: Callable[[List[str]], Optional[Message]] = lambda x: None + self._parse_cols: Callable[[Tuple[str, ...]], Optional[Message]] = ( + lambda x: None + ) + + @property + def start_time(self) -> Optional[datetime]: + if self._start_time: + return datetime.fromtimestamp(self._start_time, timezone.utc) + return None def _extract_header(self): line = "" @@ -89,9 +95,10 @@ def _extract_header(self): elif line.startswith(";$STARTTIME"): logger.debug("TRCReader: Found start time '%s'", line) try: - self.start_time = datetime( - 1899, 12, 30, tzinfo=timezone.utc - ) + timedelta(days=float(line.split("=")[1])) + self._start_time = ( + datetime(1899, 12, 30, tzinfo=timezone.utc) + + timedelta(days=float(line.split("=")[1])) + ).timestamp() except IndexError: logger.debug("TRCReader: Failed to parse start time") elif line.startswith(";$COLUMNS"): @@ -99,6 +106,7 @@ def _extract_header(self): try: columns = line.split("=")[1].split(",") self.columns = {column: columns.index(column) for column in columns} + self._num_columns = len(columns) - 1 except IndexError: logger.debug("TRCReader: Failed to parse columns") elif line.startswith(";"): @@ -107,7 +115,7 @@ def _extract_header(self): break if self.file_version >= TRCFileVersion.V1_1: - if self.start_time is None: + if self._start_time is None: raise ValueError("File has no start time information") if self.file_version >= TRCFileVersion.V2_0: @@ -132,7 +140,7 @@ def _extract_header(self): return line - def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]: + def _parse_msg_v1_0(self, cols: Tuple[str, ...]) -> Optional[Message]: arbit_id = cols[2] if arbit_id == "FFFFFFFF": logger.info("TRCReader: Dropping bus info line") @@ -147,16 +155,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]: msg.data = bytearray([int(cols[i + 4], 16) for i in range(msg.dlc)]) return msg - def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]: + def _parse_msg_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: arbit_id = cols[3] msg = Message() - if isinstance(self.start_time, datetime): - msg.timestamp = ( - self.start_time + timedelta(milliseconds=float(cols[1])) - ).timestamp() - else: - msg.timestamp = float(cols[1]) / 1000 + msg.timestamp = float(cols[1]) / 1000 + self._start_time msg.arbitration_id = int(arbit_id, 16) msg.is_extended_id = len(arbit_id) > 4 msg.channel = 1 @@ -165,16 +168,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]: msg.is_rx = cols[2] == "Rx" return msg - def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]: + def _parse_msg_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: arbit_id = cols[4] msg = Message() - if isinstance(self.start_time, datetime): - msg.timestamp = ( - self.start_time + timedelta(milliseconds=float(cols[1])) - ).timestamp() - else: - msg.timestamp = float(cols[1]) / 1000 + msg.timestamp = float(cols[1]) / 1000 + self._start_time msg.arbitration_id = int(arbit_id, 16) msg.is_extended_id = len(arbit_id) > 4 msg.channel = int(cols[2]) @@ -183,7 +181,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]: msg.is_rx = cols[3] == "Rx" return msg - def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]: + def _parse_msg_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]: type_ = cols[self.columns["T"]] bus = self.columns.get("B", None) @@ -192,32 +190,25 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]: dlc = len2dlc(length) elif "L" in self.columns: dlc = int(cols[self.columns["L"]]) - length = dlc2len(dlc) else: raise ValueError("No length/dlc columns present.") msg = Message() - if isinstance(self.start_time, datetime): - msg.timestamp = ( - self.start_time + timedelta(milliseconds=float(cols[self.columns["O"]])) - ).timestamp() - else: - msg.timestamp = float(cols[1]) / 1000 + msg.timestamp = float(cols[self.columns["O"]]) / 1000 + self._start_time msg.arbitration_id = int(cols[self.columns["I"]], 16) msg.is_extended_id = len(cols[self.columns["I"]]) > 4 msg.channel = int(cols[bus]) if bus is not None else 1 msg.dlc = dlc - msg.data = bytearray( - [int(cols[i + self.columns["D"]], 16) for i in range(length)] - ) + if dlc: + msg.data = bytearray.fromhex(cols[self.columns["D"]]) msg.is_rx = cols[self.columns["d"]] == "Rx" - msg.is_fd = type_ in ["FD", "FB", "FE", "BI"] - msg.bitrate_switch = type_ in ["FB", " FE"] - msg.error_state_indicator = type_ in ["FE", "BI"] + msg.is_fd = type_ in {"FD", "FB", "FE", "BI"} + msg.bitrate_switch = type_ in {"FB", "FE"} + msg.error_state_indicator = type_ in {"FE", "BI"} return msg - def _parse_cols_v1_1(self, cols: List[str]) -> Optional[Message]: + def _parse_cols_v1_1(self, cols: Tuple[str, ...]) -> Optional[Message]: dtype = cols[2] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_1(cols) @@ -225,7 +216,7 @@ def _parse_cols_v1_1(self, cols: List[str]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v1_3(self, cols: List[str]) -> Optional[Message]: + def _parse_cols_v1_3(self, cols: Tuple[str, ...]) -> Optional[Message]: dtype = cols[3] if dtype in ("Tx", "Rx"): return self._parse_msg_v1_3(cols) @@ -233,9 +224,9 @@ def _parse_cols_v1_3(self, cols: List[str]) -> Optional[Message]: logger.info("TRCReader: Unsupported type '%s'", dtype) return None - def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]: + def _parse_cols_v2_x(self, cols: Tuple[str, ...]) -> Optional[Message]: dtype = cols[self.columns["T"]] - if dtype in ["DT", "FD", "FB"]: + if dtype in {"DT", "FD", "FB", "FE", "BI"}: return self._parse_msg_v2_x(cols) else: logger.info("TRCReader: Unsupported type '%s'", dtype) @@ -244,7 +235,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]: def _parse_line(self, line: str) -> Optional[Message]: logger.debug("TRCReader: Parse '%s'", line) try: - cols = line.split() + cols = tuple(line.split(maxsplit=self._num_columns)) return self._parse_cols(cols) except IndexError: logger.warning("TRCReader: Failed to parse message '%s'", line)