Skip to content

Commit

Permalink
Add transfer function support to DataLoader (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthrok authored Dec 26, 2024
1 parent 6e11733 commit 787b7cd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
2 changes: 1 addition & 1 deletion examples/imagenet_classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import spdl.io
import spdl.utils
import torch
from spdl.dataloader._dataloader import DataLoader
from spdl.dataloader import DataLoader
from torch import Tensor
from torch.profiler import profile

Expand Down
69 changes: 49 additions & 20 deletions src/spdl/dataloader/_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,20 @@ class DataLoader(Generic[Source, Output]):
│ └─────────────────────┘
┌─▼─────────────────────┐
│ Aggregate │
│ │
│ fn: list[T] -> Output │
│ │
└─┬─────────────────────┘
│ Aggregate │─┐
│ │ │─┐
│ fn: list[T] -> Output │ │ │
│ │ │ │
└──┬────────────────────┘ │ │
│└──┬───────────────────┘ │
│ └─────────────────────┘
┌─▼──────────────────────┐
│ Transfer │
│ │ * No concurrency, as GPUs do not support
│ fn: Output -> Output │ transferring multiple data concurrently.
│ │
└─┬──────────────────────┘
┌─▼──────┐
│ Buffer │
Expand All @@ -85,6 +94,11 @@ class DataLoader(Generic[Source, Output]):
drop_last: If ``True`` and the number of source items are not divisible by
``batch_size``, then drop the reminder.
transfer_fn: A function applied to the output of aggregator function.
It is intended for transferring data to GPU devices.
Since GPU device transfer does not support concurrent transferring,
this function is executed in a single thread.
buffer_size: The number of aggregated items to buffer.
num_threads: The number of worker threads.
Expand All @@ -98,7 +112,7 @@ class DataLoader(Generic[Source, Output]):
Exapmles:
>>> import spdl.io
>>> from spdl.io import ImageFrames
>>> from spdl.io import CPUBuffer, CUDABuffer, ImageFrames
>>>
>>> import torch
>>> from torch import Tensor
Expand Down Expand Up @@ -130,21 +144,27 @@ class DataLoader(Generic[Source, Output]):
...
>>>
>>> ##################################################################
>>> # Aggregator (batch + device transfer)
>>> # Aggregator
>>> ##################################################################
>>> cuda_device_index = 0
>>> size = width * height * batch_size * 3
>>> storage = spdl.io.cpu_storage(size, pin_memory=True)
>>> stream = torch.cuda.Stream(device=cuda_device_index)
>>>
>>> def batchify(data: list[ImageFrames]) -> Tensor:
... # Merge the decoded frames into the pre-allocated pinned-memory.
... return spdl.io.convert_frames(data, storage=storage)
...
>>>
>>> ##################################################################
>>> # Transfer
>>> ##################################################################
>>> cuda_device_index = 0
>>> stream = torch.cuda.Stream(device=cuda_device_index)
>>> cuda_config = spdl.io.cuda_config(
... device_index=cuda_device_index,
... stream=stream.cuda_stream,
... )
>>>
>>> def batchify(data: list[ImageFrames]) -> Tensor:
... # Merge the decoded frames into the pre-allocated pinned-memory.
... cpu_buffer = spdl.io.convert_frames(data, storage=storage)
>>> def transfer(cpu_buffer: CPUBuffer) -> CUDABuffer:
... # Send to CUDA in a separate stream.
... cuda_buffer = spdl.io.transfer_buffer(cpu_buffer, cuda_config=cuda_config)
... # Cast to Torch Tensor type.
Expand All @@ -156,6 +176,7 @@ class DataLoader(Generic[Source, Output]):
... preprocessor=decode_image,
... batch_size=batch_size,
... aggregator=batchify,
... transfer_fn=transfer,
... )
>>>
>>> for batch in dataloader:
Expand Down Expand Up @@ -185,6 +206,8 @@ def __init__(
batch_size: int | None = None,
drop_last: bool = False,
aggregator: Functions[[list[T]], Output] | None = None,
# Device transfer
transfer_fn: Callable[[Output], Output] | None = None,
# Buffering
buffer_size: int = 3,
# Execution config
Expand All @@ -195,6 +218,7 @@ def __init__(
self._src = src
self._preprocessor = preprocessor
self._aggregator = aggregator
self._transfer_fn = transfer_fn

self._batch_size = batch_size
self._drop_last = drop_last
Expand All @@ -204,21 +228,26 @@ def __init__(
self._output_order = output_order

def _get_pipeline(self) -> Pipeline:
builder = PipelineBuilder()
builder.add_source(self._src)
pipe_args = {
"concurrency": self._num_threads,
"output_order": self._output_order,
}

builder = PipelineBuilder().add_source(self._src)
if self._preprocessor:
builder.pipe(
self._preprocessor,
concurrency=self._num_threads,
output_order=self._output_order,
)
builder.pipe(self._preprocessor, **pipe_args)

if self._batch_size:
builder.aggregate(self._batch_size, drop_last=self._drop_last)

if self._aggregator:
builder.pipe(self._aggregator)
builder.pipe(self._aggregator, **pipe_args)

# Transfer runs in the default thread pool (with num_threads=1)
# because GPU data transfer cannot be parallelized.
# Note: this thread pool is also used by aggregate and disaggregate.
if self._transfer_fn is not None:
builder.pipe(self._transfer_fn)

builder.add_sink(self._buffer_size)

Expand Down

0 comments on commit 787b7cd

Please sign in to comment.