Skip to content

Commit

Permalink
Merge pull request #129 from Belokuikuini/mp3-pak-read-support
Browse files Browse the repository at this point in the history
PAK support for Corruption
  • Loading branch information
henriquegemignani authored Jul 14, 2024
2 parents 8b9991f + afa3ad0 commit a38a56a
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/retro_data_structures/formats/pak.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _pak_for_game(game: Game):
if game == Game.PRIME_REMASTER:
return pak_wiiu.PAK_WIIU
elif game >= Game.CORRUPTION:
raise ValueError("Unsupported game")
return pak_wii.PAK_WII
else:
return pak_gc.PAK_GC

Expand Down
168 changes: 160 additions & 8 deletions src/retro_data_structures/formats/pak_wii.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
from __future__ import annotations

import dataclasses
from typing import TYPE_CHECKING

import construct
from construct import Bytes, Const, Int32ub, PrefixedArray, Struct
from construct import Bytes, Const, FocusedSeq, IfThenElse, Int32ub, PrefixedArray, Rebuild, Struct, len_, this

from retro_data_structures import game_check
from retro_data_structures.base_resource import AssetId, AssetType, Dependency
from retro_data_structures.common_types import AssetId64, FourCC, String
from retro_data_structures.compression import LZOCompressedBlock, ZlibCompressedBlock
from retro_data_structures.construct_extensions.alignment import AlignTo
from retro_data_structures.construct_extensions.dict import make_dict

if TYPE_CHECKING:
from retro_data_structures.game_check import Game

PAKHeader = construct.Aligned(
64,
Struct(
Expand Down Expand Up @@ -38,9 +48,11 @@ def _emitparse_header(code: construct.CodeGen) -> str:
ConstructResourceHeader._emitparse = _emitparse_header

PAKNoData = Struct(
_start=construct.Tell,
_start=construct.Tell, # Should always be 0x00
_header=PAKHeader,
_table_of_contents_start=construct.Tell, # Should always be 0x40
table_of_contents=construct.Aligned(64, make_dict(Int32ub, FourCC)),
# Usually starts at 0x80, though ToC semantically has a dynamic length
_named_resources_start=construct.Tell,
named_resources=construct.Aligned(
64,
Expand All @@ -54,12 +66,152 @@ def _emitparse_header(code: construct.CodeGen) -> str:
),
),
_resources_start=construct.Tell,
_resources_start_assert=construct.Check(
construct.this.table_of_contents.STRG == construct.this._resources_start - construct.this._named_resources_start
),
resources=construct.Aligned(64, PrefixedArray(Int32ub, ConstructResourceHeader)),
_resources_end=construct.Tell,
_resources_end_assert=construct.Check(
construct.this.table_of_contents.RSHD == construct.this._resources_end - construct.this._resources_start
),
)

CompressedPakResource = FocusedSeq(
"data",
decompressed_size=Rebuild(Int32ub, len_(this.data)),
# Added Zlib check for DKCR
data=IfThenElse(game_check.uses_lzo, LZOCompressedBlock(this.decompressed_size), ZlibCompressedBlock),
)


@dataclasses.dataclass
class PakFile:
asset_id: AssetId
asset_type: AssetType
should_compress: bool
uncompressed_data: bytes | None
compressed_data: bytes | None
extra: construct.Container | None = None

def get_decompressed(self, target_game: Game) -> bytes:
if self.uncompressed_data is None:
self.uncompressed_data = CompressedPakResource.parse(self.compressed_data, target_game=target_game)
return self.uncompressed_data

def get_compressed(self, target_game: Game) -> bytes:
if self.compressed_data is None:
self.compressed_data = CompressedPakResource.build(self.uncompressed_data, target_game=target_game)
return self.compressed_data

def set_new_data(self, data: bytes):
self.uncompressed_data = data
self.compressed_data = None


@dataclasses.dataclass
class PakBody:
md5_hash: bytes
named_resources: list[tuple[str, Dependency]]
files: list[PakFile]


class ConstructPakWii(construct.Construct):
def _parse(self, stream, context, path) -> PakBody:
header = PAKNoData._parsereport(stream, context, f"{path} -> header")

AlignTo(64)._parse(stream, context, path)

files = []
# Resource offsets are relative to the start of the DATA section
data_start = construct.stream_tell(stream, path)
for i, resource in enumerate(header.resources):
if resource.offset + data_start != construct.stream_tell(stream, path):
raise construct.ConstructError(f"Expected resource at {resource.offset + data_start}", path)

data = construct.stream_read(stream, resource.size, path)
# TODO : Padding to be added ?
if resource.compressed > 0:
uncompressed_data = None
compressed_data = data
else:
uncompressed_data = data
compressed_data = None

files.append(
PakFile(
resource.asset_id, resource.asset_type, resource.compressed > 0, uncompressed_data, compressed_data
)
)

return PakBody(
md5_hash=header._header.md5_hash,
named_resources=[
(named.name, Dependency(type=named.asset_type, id=named.asset_id)) for named in header.named_resources
],
files=files,
)

def _build(self, obj: PakBody, stream, context, path):
assert isinstance(obj, PakBody)

header = construct.Container(
# These next 5 fields are for now default values and will need to be rebuilt
_header=construct.Container(header_size=0, md5_hash=obj.md5_hash),
table_of_contents=construct.Container(
STRG=0, # Named resources size
RSHD=0, # Resource table size
DATA=0, # Data section size
),
named_resources=construct.ListContainer(
construct.Container(asset_type=dep.type, asset_id=dep.id, name=name)
for name, dep in obj.named_resources
),
resources=construct.ListContainer(
construct.Container(compressed=0, asset_type=file.asset_type, asset_id=file.asset_id, size=0, offset=0)
for file in obj.files
),
)

header_start = construct.stream_tell(stream, path)

pnd_build = PAKNoData._build(header, stream, context, path)
section_lengths = {
"header": pnd_build._table_of_contents_start - pnd_build._start,
"STRG": pnd_build._resources_start - pnd_build._named_resources_start,
"RSHD": pnd_build._resources_end - pnd_build._resources_start,
"DATA": 0,
}
AlignTo(64)._build(None, stream, context, path)

data_start = construct.stream_tell(stream, path)

for i, file in enumerate(obj.files):
compressed = file.should_compress
game = game_check.get_current_game(context)
if compressed:
data = file.get_compressed(game)
else:
data = file.get_decompressed(game)

# TODO : If the file ends up bigger, don't compress
# if compressed and len(data) > len(file.get_decompressed(game)):
# compressed = False
# data = file.get_decompressed(game)

pad = 64 - (len(data) % 64)
if pad < 64:
data += b"\xff" * pad

header.resources[i].offset = construct.stream_tell(stream, path) - data_start
header.resources[i].size = len(data)
header.resources[i].compressed = int(compressed)
section_lengths["DATA"] += len(data)
construct.stream_write(stream, data, len(data), path)

# Update header to contain accurate information to PAK contents
files_end = construct.stream_tell(stream, path)
for key, value in section_lengths.items():
if key == "header":
header._header.header_size = value
else:
header.table_of_contents[key] = value
construct.stream_seek(stream, header_start, 0, path)
PAKNoData._build(header, stream, context, path)
construct.stream_seek(stream, files_end, 0, path)


PAK_WII = ConstructPakWii()
141 changes: 141 additions & 0 deletions tests/formats/test_pak_wii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from __future__ import annotations

# The two following imports are only used by file tests
# from glob import glob
# from os import path
from retro_data_structures.formats.pak import Pak
from retro_data_structures.formats.pak_wii import (
PAK_WII,
PAKNoData,
)

# The two following classes are only used by file tests
# PakFile,
# PakBody
from retro_data_structures.game_check import Game

# ruff: noqa: E501

# The following variables are only used for the file tests and should be set before executing said tests locally
# pak_target = "."
# pak_build_dir = "."

paks = {
"FrontEnd",
"GuiDVD",
"GuiNAND",
"InGameAudio",
"InGameDVD",
"InGameNAND",
"Logbook",
"Metroid1",
"Metroid3",
"Metroid4",
"Metroid5",
"Metroid6",
"Metroid7",
"Metroid8",
"MiscData",
"NoARAM",
"SamGunFx",
"SamusGun",
"UniverseArea",
"Worlds",
}


def test_identical_when_keep_data(prime3_iso_provider):
game = Game.CORRUPTION
for pakfile in paks:
with prime3_iso_provider.open_binary(pakfile + ".pak") as f:
raw = f.read()

decoded = Pak.parse(raw, target_game=game)
encoded = decoded.build()

assert raw == encoded


def test_compare_header_keep_data(prime3_iso_provider):
game = Game.CORRUPTION
for pakfile in paks:
with prime3_iso_provider.open_binary(pakfile + ".pak") as f:
raw = f.read()

raw_header = PAKNoData.parse(raw, target_game=game)
raw_sizes = [(r.compressed, r.offset, r.size) for r in raw_header.resources]

decoded = PAK_WII.parse(raw, target_game=game)
# for r in decoded.resources:
# r.contents.pop("data")

encoded = PAK_WII.build(decoded, target_game=game)

custom_header = PAKNoData.parse(encoded, target_game=game)

custom_sizes = [(r.compressed, r.offset, r.size) for r in custom_header.resources]
assert custom_sizes == raw_sizes


# The following tests are what I call file tests :
# They produce or read local files specified by the two global variables pak_target and pak_build_dir
# They are NOT to be executed as tests by CI and are only here for reviewing the testing process

# def test_write_new_pak():
# game = Game.CORRUPTION
# files = [
# PakFile(0xDEADBEEF, "STRG", False, b"abcdefg", None),
# PakFile(0xDEADD00D, "STRG", False, b"hijklmn", None)
# ]
# body = PakBody(b"joe mama so fat ", [
# ("Hey its me Jack Block from minecraft",
# Dependency("STRG", 0xDEADBEEF))
# ],
# files
# )

# output_pak = Pak(body, game)
# encoded = output_pak.build()

# with open(pak_target, "wb") as fd :
# fd.write(encoded)

# def test_build_from_extracted_pak():
# game = Game.CORRUPTION

# files = []
# for file in glob(pak_build_dir + "/*"):
# asset_id, asset_type = file.split(".")
# asset_id = int(path.basename(asset_id), 16)

# data = b""
# with open(file, "rb") as fd:
# data = fd.read()

# files.append(PakFile(asset_id, asset_type, False, data, None))

# body = PakBody(b"\x1B\x62\xF7\xCA\x15\x60\xB1\x85\xC1\xE1\x09\x43\x99\x4F\xB9\xAC", [
# ("03b_Bryyo_Fire_#SERIAL#",
# Dependency("MLVL", 0x9BA9292D588D6EB8)),
# ("03b_Bryyo_Reptilicus_#SERIAL#",
# Dependency("MLVL", 0x9F059B53561A9695)),
# ("03b_Bryyo_Ice_#SERIAL#",
# Dependency("MLVL", 0xB0D67636D61F3868))
# ],
# files
# )

# output_pak = Pak(body, game)
# encoded = output_pak.build()

# with open(pak_target, "wb") as fd:
# fd.write(encoded)

# def test_parse_new_pak():
# game = Game.CORRUPTION

# with open(pak_target, "rb") as fd:
# raw = fd.read()

# decoded = Pak.parse(raw, game)
# return decoded

0 comments on commit a38a56a

Please sign in to comment.