From d9f60f45a19ad3ca4c17572060760afef49c8fbb Mon Sep 17 00:00:00 2001 From: urasakikeisuke Date: Fri, 7 Jun 2024 16:07:24 +0900 Subject: [PATCH] fix to prevent infinite loop when loading invalid files Signed-off-by: urasakikeisuke --- src/pypcd4/pypcd4.py | 15 +++++++++------ tests/conftest.py | 8 ++++++++ tests/pcd/ascii_empty.pcd | 0 tests/pcd/ascii_invalid_header.pcd | 3 +++ tests/test/test_pypcd4.py | 10 ++++++++++ 5 files changed, 30 insertions(+), 6 deletions(-) create mode 100644 tests/pcd/ascii_empty.pcd create mode 100644 tests/pcd/ascii_invalid_header.pcd diff --git a/src/pypcd4/pypcd4.py b/src/pypcd4/pypcd4.py index 602d244..9b9d509 100644 --- a/src/pypcd4/pypcd4.py +++ b/src/pypcd4/pypcd4.py @@ -5,6 +5,7 @@ import string import struct from enum import Enum +from io import BufferedReader from pathlib import Path from typing import TYPE_CHECKING, BinaryIO, List, Literal, Optional, Sequence, Tuple, Union @@ -171,7 +172,7 @@ def build_dtype(self) -> np.dtype[np.void]: return np.dtype([x for x in zip(field_names, np_types)]) -def _parse_pc_data(fp: BinaryIO, metadata: MetaData) -> npt.NDArray: +def _parse_pc_data(fp: BufferedReader, metadata: MetaData) -> npt.NDArray: dtype = metadata.build_dtype() if metadata.points > 0: @@ -217,13 +218,15 @@ def __init__(self, metadata: MetaData, pc_data: npt.NDArray) -> None: self.pc_data = pc_data @staticmethod - def from_fileobj(fp: BinaryIO) -> PointCloud: + def from_fileobj(fp: BufferedReader) -> PointCloud: lines: List[str] = [] - while True: - line = fp.readline().strip() - lines.append(line.decode(encoding="utf-8") if isinstance(line, bytes) else line) + for bline in fp: + if (line := bline.decode(encoding="utf-8").strip()).startswith("#") or not line: + continue + + lines.append(line) - if lines[-1].startswith("DATA"): + if line.startswith("DATA") or len(lines) >= 10: break metadata = MetaData.parse_header(lines) diff --git a/tests/conftest.py b/tests/conftest.py index b73d4ad..497e960 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -315,3 +315,11 @@ def xyzrgb_binary_compressed_path(): @pytest.fixture def xyzintensity_binary_compressed_organized_path(): return f"{Path(__file__).resolve().parent}/pcd/binary_compressed_organized.pcd" + +@pytest.fixture +def ascii_empty_path(): + return f"{Path(__file__).resolve().parent}/pcd/ascii_empty.pcd" + +@pytest.fixture +def ascii_invalid_header_path(): + return f"{Path(__file__).resolve().parent}/pcd/ascii_invalid_header.pcd" diff --git a/tests/pcd/ascii_empty.pcd b/tests/pcd/ascii_empty.pcd new file mode 100644 index 0000000..e69de29 diff --git a/tests/pcd/ascii_invalid_header.pcd b/tests/pcd/ascii_invalid_header.pcd new file mode 100644 index 0000000..1d0005b --- /dev/null +++ b/tests/pcd/ascii_invalid_header.pcd @@ -0,0 +1,3 @@ +# .PCD v.7 - Point Cloud Data file format +VERSION .7 +FIEL diff --git a/tests/test/test_pypcd4.py b/tests/test/test_pypcd4.py index d32c55c..1ee948e 100644 --- a/tests/test/test_pypcd4.py +++ b/tests/test/test_pypcd4.py @@ -276,6 +276,16 @@ def test_load_binary_compressed_organized_pcd(xyzintensity_binary_compressed_org assert len(pc.pc_data) == pc.metadata.points +def test_load_ascii_empty_pcd(ascii_empty_path): + with pytest.raises(ValidationError): + PointCloud.from_path(ascii_empty_path) + + +def test_load_ascii_invalid_header_pcd(ascii_invalid_header_path): + with pytest.raises(ValidationError): + PointCloud.from_path(ascii_invalid_header_path) + + def test_from_points(): array = np.array([[1, 2, 3], [4, 5, 6]]) fields = ("x", "y", "z")