From 3ef266a19b017a98fb271029ef1f4023689bff52 Mon Sep 17 00:00:00 2001 From: Henrique Gemignani Passos Lima Date: Wed, 26 Jun 2024 20:56:21 +0300 Subject: [PATCH] Add GameDisc, a pure python alternative for nod --- src/retro_data_structures/asset_manager.py | 66 ++++++++---- src/retro_data_structures/formats/dol.py | 24 +++++ src/retro_data_structures/game_disc.py | 112 +++++++++++++++++++++ src/retro_data_structures/gc_disc.py | 79 +++++++++++++++ tests/conftest.py | 9 +- tests/formats/test_pak_gc.py | 6 +- tests/test_game_disc.py | 27 +++++ 7 files changed, 300 insertions(+), 23 deletions(-) create mode 100644 src/retro_data_structures/formats/dol.py create mode 100644 src/retro_data_structures/game_disc.py create mode 100644 src/retro_data_structures/gc_disc.py create mode 100644 tests/test_game_disc.py diff --git a/src/retro_data_structures/asset_manager.py b/src/retro_data_structures/asset_manager.py index ebe054b..f05ea0d 100644 --- a/src/retro_data_structures/asset_manager.py +++ b/src/retro_data_structures/asset_manager.py @@ -7,6 +7,7 @@ import typing from collections import defaultdict +import construct import nod from retro_data_structures import formats @@ -24,13 +25,12 @@ from retro_data_structures.formats import Dgrp, dependency_cheating from retro_data_structures.formats.audio_group import Agsc, Atbl from retro_data_structures.formats.pak import Pak +from retro_data_structures.game_disc import GameDisc if typing.TYPE_CHECKING: from collections.abc import Iterator from pathlib import Path - import construct - from retro_data_structures.formats.ancs import Ancs from retro_data_structures.game_check import Game @@ -48,6 +48,9 @@ def rglob(self, pattern: str) -> Iterator[str]: def open_binary(self, name: str) -> typing.BinaryIO: raise NotImplementedError + def read_binary(self, name: str) -> bytes: + raise NotImplementedError + def get_dol(self) -> bytes: raise NotImplementedError @@ -71,24 +74,42 @@ def rglob(self, name: str) -> Iterator[str]: def open_binary(self, name: str) -> typing.BinaryIO: return self.root.joinpath(name).open("rb") + def read_binary(self, name: str) -> bytes: + with self.open_binary(name) as f: + return f.read() + def get_dol(self) -> bytes: with self.open_binary("sys/main.dol") as f: return f.read() class IsoFileProvider(FileProvider): + game_disc: GameDisc | None + disc: nod.DiscBase + data: nod.Partition + def __init__(self, iso_path: Path): - result = nod.open_disc_from_image(iso_path) - if result is None: - raise ValueError(f"{iso_path} is not a GC/Wii ISO") + self.iso_path = iso_path - self.disc = result[0] - self.data = self.disc.get_data_partition() - if self.data is None: - raise ValueError(f"{iso_path} does not have data") + self.game_disc = None - self.all_files = self.data.files() - self.iso_path = iso_path + try: + self.game_disc = GameDisc.parse(iso_path) + self.all_files = self.game_disc.files() + + except construct.ConstError: + # Fallback to nod, likely a Wii ISO + + result = nod.open_disc_from_image(iso_path) + if result is None: + raise ValueError(f"{iso_path} is not a GC/Wii ISO") + + self.disc = result[0] + self.data = self.disc.get_data_partition() + if self.data is None: + raise ValueError(f"{iso_path} does not have data") + + self.all_files = self.data.files() def __repr__(self): return f"" @@ -102,10 +123,23 @@ def rglob(self, pattern: str) -> Iterator[str]: yield it def open_binary(self, name: str): - return self.data.read_file(name) + if self.game_disc is None: + return self.data.read_file(name) + else: + return self.game_disc.open_binary(name) + + def read_binary(self, name: str) -> bytes: + if self.game_disc is None: + with self.open_binary(name) as f: + return f.read() + else: + return self.game_disc.read_binary(name) def get_dol(self) -> bytes: - return self.data.get_dol() + if self.game_disc is None: + return self.data.get_dol() + else: + return self.game_disc.get_dol() class AssetManager: @@ -154,8 +188,7 @@ def _update_headers(self): self._custom_asset_ids = {} if self.provider.is_file("custom_names.json"): - with self.provider.open_binary("custom_names.json") as f: - custom_names_text = f.read().decode("utf-8") + custom_names_text = self.provider.read_binary("custom_names.json").decode("utf-8") self._custom_asset_ids.update(dict(json.loads(custom_names_text).items())) @@ -380,8 +413,7 @@ def get_pak(self, pak_name: str) -> Pak: if pak_name not in self._in_memory_paks: logger.info("Reading %s", pak_name) - with self.provider.open_binary(pak_name) as f: - data = f.read() + data = self.provider.read_binary(pak_name) self._in_memory_paks[pak_name] = Pak.parse(data, target_game=self.target_game) diff --git a/src/retro_data_structures/formats/dol.py b/src/retro_data_structures/formats/dol.py new file mode 100644 index 0000000..7e6e3f4 --- /dev/null +++ b/src/retro_data_structures/formats/dol.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import construct + +DolHeader = construct.Struct( + text_offset=construct.Int32ub[7], + data_offset=construct.Int32ub[11], + text_base_address=construct.Int32ub[7], + data_base_address=construct.Int32ub[11], + text_size=construct.Int32ub[7], + data_size=construct.Int32ub[11], + bss_start=construct.Int32ub, + bss_size=construct.Int32ub, + entrypoint=construct.Int32ub, +) + + +def calculate_size_from_header(header: construct.Container) -> int: + result = header.text_offset[0] + for size in header.text_size: + result += size + for size in header.data_size: + result += size + return result diff --git a/src/retro_data_structures/game_disc.py b/src/retro_data_structures/game_disc.py new file mode 100644 index 0000000..0149b89 --- /dev/null +++ b/src/retro_data_structures/game_disc.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import collections +import dataclasses +import io +import typing + +import construct + +from retro_data_structures.formats import dol +from retro_data_structures.gc_disc import GcDisc + +if typing.TYPE_CHECKING: + from pathlib import Path + + +@dataclasses.dataclass +class FileEntry: + offset: int + size: int + + +FileTree: typing.TypeAlias = dict[str, typing.Union[FileEntry, "FileTree"]] + + +class GameDisc: + _file_path: Path + _raw: construct.Container + _file_tree: FileTree + + def __init__(self, file_path: Path, raw: construct.Container, file_tree: FileTree): + self._file_path = file_path + self._raw = raw + self._file_tree = file_tree + + @classmethod + def parse(cls, file_path: Path) -> GameDisc: + with file_path.open("rb") as source: + data = GcDisc.parse_stream(source) + + file_tree: dict = {} + current_dir = file_tree + + end_folder = collections.defaultdict(list) + + names_stream = io.BytesIO(data.fst.names) + for i, file in enumerate(data.fst.file_entries): + if i == 0: + continue + + if i in end_folder: + current_dir = end_folder.pop(i)[0] + + names_stream.seek(file.file_name) + name = construct.CString("ascii").parse_stream(names_stream) + if file.is_directory: + new_dir = {} + end_folder[file.param].append(current_dir) + current_dir[name] = new_dir + current_dir = new_dir + else: + current_dir[name] = FileEntry( + offset=file.offset, + size=file.param, + ) + + return GameDisc(file_path, data, file_tree) + + def _get_file_entry(self, name: str) -> FileEntry: + file_entry = self._file_tree + for segment in name.split("/"): + file_entry = file_entry[segment] + + if isinstance(file_entry, FileEntry): + return file_entry + else: + raise OSError(f"{name} is a directory") + + def files(self) -> list[str]: + result = [] + + def recurse(parent: str, tree: FileTree) -> None: + for key, item in tree.items(): + name = f"{parent}/{key}" if parent else key + + if isinstance(item, FileEntry): + result.append(name) + else: + recurse(name, item) + + recurse("", self._file_tree) + return result + + def open_binary(self, name: str) -> typing.BinaryIO: + entry = self._get_file_entry(name) + file = self._file_path.open("rb") + file.seek(entry.offset) + return file + + def read_binary(self, name: str) -> bytes: + entry = self._get_file_entry(name) + with self._file_path.open("rb") as file: + file.seek(entry.offset) + return file.read(entry.size) + + def get_dol(self) -> bytes: + with self._file_path.open("rb") as file: + file.seek(self._raw.header.main_executable_offset) + header = dol.DolHeader.parse_stream(file) + dol_size = dol.calculate_size_from_header(header) + file.seek(self._raw.header.main_executable_offset) + return file.read(dol_size) diff --git a/src/retro_data_structures/gc_disc.py b/src/retro_data_structures/gc_disc.py new file mode 100644 index 0000000..ea699f2 --- /dev/null +++ b/src/retro_data_structures/gc_disc.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import construct + +# boot.bin +DiscHeader = construct.Struct( + game_code=construct.Bytes(4), + maker_code=construct.Bytes(2), + disc_id=construct.Int8ub, # for multi-disc games + version=construct.Int8ub, + audio_streaming=construct.Int8ub, + stream_buffer_size=construct.Int8ub, + _unused_a=construct.Const(b"\x00" * 14), + _wii_magic_word=construct.Const(0, construct.Int32ub), # 0x5D1C9EA3 + _gc_magic_word=construct.Const(0xC2339F3D, construct.Int32ub), + game_name=construct.PaddedString(0x3E0, "utf8"), + debug_monitor_offset=construct.Int32ub, + debug_monitor_load_address=construct.Int32ub, + _unused_b=construct.Const(b"\x00" * 24), + main_executable_offset=construct.Int32ub, + fst_offset=construct.Int32ub, + fst_size=construct.Int32ub, + fst_maximum_size=construct.Int32ub, + user_position=construct.Int32ub, + user_length=construct.Int32ub, + unknown=construct.Int32ub, + _unused_c=construct.Const(b"\x00" * 4), # construct.Bytes(0x4), +) +assert DiscHeader.sizeof() == 0x0440 + +DiscHeaderInformation = construct.Struct( + debug_monitor_size=construct.Int32ub, + simulated_memory_size=construct.Int32ub, + argument_offset=construct.Int32ub, + debug_flag=construct.Int32ub, + track_address=construct.Int32ub, + track_size=construct.Int32ub, + country_code=construct.Int32ub, + unknown=construct.Int32ub, + padding=construct.Bytes(8160), +) +assert DiscHeaderInformation.sizeof() == 0x2000 + +AppLoader = construct.Struct( + date=construct.Aligned(16, construct.Bytes(10)), + entry_point=construct.Hex(construct.Int32ub), + _size=construct.Rebuild(construct.Int32ub, construct.len_(construct.this.code)), + trailer_size=construct.Int32ub, + code=construct.Bytes(construct.this._size), +) + +FileEntry = construct.Struct( + is_directory=construct.Flag, + file_name=construct.Int24ub, + offset=construct.Int32ub, + param=construct.Int32ub, +) +RootFileEntry = construct.Struct( + is_directory=construct.Const(True, construct.Flag), + file_name=construct.Const(0, construct.Int24ub), + _offset=construct.Const(0, construct.Int32ub), + num_entries=construct.Int32ub, +) + +GcDisc = construct.Struct( + header=DiscHeader, + header_information=DiscHeaderInformation, + app_loader=AppLoader, + root_offset=construct.Tell, + _fst_seek=construct.Seek(construct.this.header.fst_offset), + fst=construct.FixedSized( + construct.this.header.fst_size, + construct.Struct( + root_entry=construct.Peek(RootFileEntry), + file_entries=FileEntry[construct.this.root_entry.num_entries], + names=construct.GreedyBytes, + ), + ), +) diff --git a/tests/conftest.py b/tests/conftest.py index a7ec875..326ba12 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,14 +21,19 @@ def get_env_or_skip(env_name): return os.environ[env_name] +@pytest.fixture(scope="module") +def prime1_iso() -> Path: + return Path(get_env_or_skip("PRIME1_ISO")) + + @pytest.fixture(scope="module") def prime2_iso() -> Path: return Path(get_env_or_skip("PRIME2_ISO")) @pytest.fixture(scope="module") -def prime1_iso_provider(): - return IsoFileProvider(Path(get_env_or_skip("PRIME1_ISO"))) +def prime1_iso_provider(prime1_iso: Path): + return IsoFileProvider(prime1_iso) @pytest.fixture(scope="module") diff --git a/tests/formats/test_pak_gc.py b/tests/formats/test_pak_gc.py index 2b5732d..cc3ba5b 100644 --- a/tests/formats/test_pak_gc.py +++ b/tests/formats/test_pak_gc.py @@ -171,8 +171,7 @@ def test_echoes_resource_encode_decode(compressed_resource): def test_identical_when_keep_data(prime2_iso_provider): game = Game.ECHOES - with prime2_iso_provider.open_binary("GGuiSys.pak") as f: - raw = f.read() + raw = prime2_iso_provider.read_binary("GGuiSys.pak") decoded = Pak.parse(raw, target_game=game) encoded = decoded.build() @@ -182,8 +181,7 @@ def test_identical_when_keep_data(prime2_iso_provider): def test_compare_header_keep_data(prime2_iso_provider): game = Game.ECHOES - with prime2_iso_provider.open_binary("GGuiSys.pak") as f: - raw = f.read() + raw = prime2_iso_provider.read_binary("GGuiSys.pak") raw_header = PAKNoData.parse(raw, target_game=game) raw_sizes = [(r.compressed, r.offset, r.size) for r in raw_header.resources] diff --git a/tests/test_game_disc.py b/tests/test_game_disc.py new file mode 100644 index 0000000..8a04d19 --- /dev/null +++ b/tests/test_game_disc.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import hashlib +from typing import TYPE_CHECKING + +from retro_data_structures import game_disc + +if TYPE_CHECKING: + from pathlib import Path + + +def test_prime1_dol(prime1_iso: Path) -> None: + gc_disc = game_disc.GameDisc.parse(prime1_iso) + disc_dol = gc_disc.get_dol() + + assert hashlib.sha256(disc_dol).digest() == ( + b"wz\x83\n8\xa1\xd2\x07\x11i\x85g\xff\x89X\xfbO\xe7$\x1ar?J\x18\xe25YP\xd7\x9f\xc8V" + ) + + +def test_prime2_dol(prime2_iso: Path) -> None: + gc_disc = game_disc.GameDisc.parse(prime2_iso) + disc_dol = gc_disc.get_dol() + + assert hashlib.sha256(disc_dol).digest() == ( + b"v!\xe7W\x1e\x0e\xe4\xe0\x98\xa4\x0b\xc8\xa0\xa3dx\x11\xbd\x94NC\x02R)Bl}\xea\xe1v\x06\x84" + )