Skip to content

Commit

Permalink
Get rid of asyncio from IO module (#319)
Browse files Browse the repository at this point in the history
As Pipeline now handles sync/async fine, async wrappers are redundant.
  • Loading branch information
mthrok authored Jan 12, 2025
1 parent 0906298 commit 32d801a
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 488 deletions.
319 changes: 2 additions & 317 deletions src/spdl/io/_composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from spdl.lib import _libspdl

from . import _core, _preprocessing
from ._core import _FILTER_DESC_DEFAULT, run_async
from ._core import _FILTER_DESC_DEFAULT

__all__ = [
"load_audio",
Expand Down Expand Up @@ -126,53 +126,6 @@ def load_audio(
)


@overload
async def async_load_audio(
src: str | bytes,
timestamp: tuple[float, float] | None = None,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: None = None,
**kwargs,
) -> CPUBuffer: ...
@overload
async def async_load_audio(
src: str | bytes,
timestamp: tuple[float, float] | None = None,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: CUDAConfig,
**kwargs,
) -> CUDABuffer: ...


async def async_load_audio(
src,
timestamp=None,
*,
demux_config=None,
decode_config=None,
filter_desc=_FILTER_DESC_DEFAULT,
device_config=None,
**kwargs,
):
"""Async version of :py:func:`~spdl.io.load_audio`."""
return await run_async(
load_audio,
src,
timestamp,
demux_config=demux_config,
decode_config=decode_config,
filter_desc=filter_desc,
device_config=device_config,
**kwargs,
)


@overload
def load_video(
src: str | bytes,
Expand Down Expand Up @@ -233,55 +186,6 @@ def load_video(
)


@overload
async def async_load_video(
src: str | bytes,
timestamp: tuple[float, float] | None = None,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: None = None,
**kwargs,
) -> CPUBuffer: ...


@overload
async def async_load_video(
src: str | bytes,
timestamp: tuple[float, float] | None = None,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: CUDAConfig,
**kwargs,
) -> CUDABuffer: ...


async def async_load_video(
src,
timestamp=None,
*,
demux_config=None,
decode_config=None,
filter_desc=_FILTER_DESC_DEFAULT,
device_config=None,
**kwargs,
):
"""Async version of :py:func:`~spdl.io.load_video`."""
return await run_async(
load_video,
src,
timestamp,
demux_config=demux_config,
decode_config=decode_config,
filter_desc=filter_desc,
device_config=device_config,
**kwargs,
)


@overload
def load_image(
src: str | bytes,
Expand Down Expand Up @@ -339,49 +243,6 @@ def load_image(
)


@overload
async def async_load_image(
src: str | bytes,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: None = None,
**kwargs,
) -> CPUBuffer: ...
@overload
async def async_load_image(
src: str | bytes,
*,
demux_config: DemuxConfig | None = None,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
device_config: CUDABuffer,
**kwargs,
) -> CUDABuffer: ...


async def async_load_image(
src,
*,
demux_config=None,
decode_config=None,
filter_desc=_FILTER_DESC_DEFAULT,
device_config=None,
**kwargs,
):
"""Async version of :py:func:`~spdl.io.load_image`."""
return await run_async(
load_image,
src,
demux_config=demux_config,
decode_config=decode_config,
filter_desc=filter_desc,
device_config=device_config,
**kwargs,
)


################################################################################
#
################################################################################
Expand Down Expand Up @@ -685,31 +546,6 @@ def _get_bytes(srcs: list[str | bytes]) -> list[bytes]:
return ret


async def async_load_image_batch_nvjpeg(
srcs: list[str | bytes],
*,
device_config: CUDAConfig,
width: int | None,
height: int | None,
pix_fmt: str | None = "rgb",
**kwargs,
):
"""**[Experimental]** Async version of :py:func:`~spdl.io.load_image_batch_nvjpeg`.
Unlike other async batch functions, this function does not employ intra-operation
parallelism. (Decoding is done sequentially.)
"""
return await run_async(
load_image_batch_nvjpeg,
srcs,
device_config=device_config,
width=width,
height=height,
pix_fmt=pix_fmt,
**kwargs,
)


def load_image_batch_nvjpeg(
srcs: list[str | bytes],
*,
Expand Down Expand Up @@ -765,158 +601,6 @@ def _decode_partial(packets, indices, decode_config, filter_desc):
return next(decoder)[indices]


async def async_sample_decode_video(
packets: VideoPackets,
indices: list[int],
*,
decode_config: DecodeConfig | None = None,
filter_desc: str | None = _FILTER_DESC_DEFAULT,
strict: bool = True,
) -> list[ImagePackets]:
"""Decode specified frames from the packets.
This function decodes the input video packets and returns the frames
specified by ``indices``. Internally, it splits the packets into
smaller set of packets and decode the minimum number of frames to
retrieve the specified frames.
.. mermaid::
block-beta
columns 15
A["Input Packets"]:15
space:15
block:B1:3
columns 3
P1["1"] P2["2"] P3["3"]
end
block:B2:3
columns 3
P4["4"] P5["5"] P6["6"]
end
block:B3:3
columns 3
P7["7"] P8["8"] P9["9"]
end
block:B4:3
columns 3
P10["10"] P11["11"] P12["12"]
end
block:B5:3
columns 3
P13["13"] P14["14"] P15["15"]
end
space:15
block:d1:3
columns 3
F1["Frame 1"] space:2
end
space:3
block:d2:3
columns 3
F7["Frame 7"] F8["Frame 8"] space
end
space:3
block:d3:3
columns 3
F13["Frame 13"] F14["Frame 14"] F15["Frame 15"]
end
space:15
space:6
block:out:3
columns 3
O1["Frame 1"] O8["Frame 8"] O15["Frame 15"]
end
space:6
A -- "Split 1" --> B1
A -- "Split 2" --> B2
A -- "Split 3" --> B3
A -- "Split 4" --> B4
A -- "Split 5" --> B5
B1 -- "Decode 1" --> d1
B3 -- "Decode 3" --> d2
B5 -- "Decode 5" --> d3
F1 --> O1
F8 --> O8
F15 --> O15
The packet splits are decoded concurrently.
The following figure illustrates the timeline of the process.
.. mermaid::
gantt
title Illustration of asynchronous sample decode timeline
dateFormat X
axisFormat %s
section Thread 1
Split Input Packets:split, 0, 3
Decode Split 1 :decode1, after split, 7
Gather and return: gather, after decode2, 14
section Thread 2
Decode Split 3 :decode2, after split, 10
section Thread 3
Decode Split 5 :decode2, after split, 13
Args:
packets: The input video packets.
indices: The list of frame indices.
decode_config:
*Optional:* Decode config.
See :py:func:`~spdl.io.decode_config`.
filter_desc: *Optional:* Filter description.
See :py:func:`~spdl.io.decode_packets` for detail.
strict: *Optional:* If True, raise an error
if any of the frames failed to decode.
Returns:
Decoded frames.
"""
if not indices:
raise ValueError("Frame indices must be non-empty.")

num_packets = len(packets)
if any(not (0 <= i < num_packets) for i in indices):
raise IndexError(f"Frame index must be [0, {num_packets}).")
if sorted(indices) != indices:
raise ValueError("Frame indices must be sorted in ascending order.")
if len(set(indices)) != len(indices):
raise ValueError("Frame indices must be unique.")

if filter_desc == _FILTER_DESC_DEFAULT:
filter_desc = _preprocessing.get_video_filter_desc()

tasks = []
for split, idxes in _libspdl._extract_packets_at_indices(packets, indices):
tasks.append(
asyncio.create_task(
run_async(_decode_partial, split, idxes, decode_config, filter_desc)
)
)

await asyncio.wait(tasks)

ret = []
for task in tasks:
try:
ret.extend(task.result())
except Exception as e:
_LG.error(f"Failed to decode {task.get_name()}. Reason: {e}")
if strict and len(ret) != len(indices):
raise RuntimeError("Failed to decode some frames.")
return ret


def sample_decode_video(
packets: VideoPackets,
indices: list[int],
Expand Down Expand Up @@ -999,6 +683,7 @@ def sample_decode_video(
F8 --> O8
F15 --> O15
Args:
packets: The input video packets.
indices: The list of frame indices.
Expand Down
Loading

0 comments on commit 32d801a

Please sign in to comment.