Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Dec 19, 2024
1 parent 6bf6afe commit ecc8443
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions python/cudf_polars/cudf_polars/experimental/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from cudf_polars.experimental.dispatch import lower_ir_node

if TYPE_CHECKING:
from collections.abc import Hashable, MutableMapping
from collections.abc import MutableMapping

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.experimental.dispatch import LowerIRTransformer
Expand Down Expand Up @@ -64,7 +64,16 @@ class ScanPartitionFlavor(IntEnum):


class ScanPartitionPlan:
"""Scan partitioning plan."""
"""
Scan partitioning plan.
Notes
-----
The meaning of `factor` depends on the value of `flavor`:
- SINGLE_FILE: `factor` must be `1`.
- SPLIT_FILES: `factor` is the number of partitions per file.
- FUSED_FILES: `factor` is the number of files per partition.
"""

__slots__ = ("factor", "flavor")
factor: int
Expand All @@ -81,7 +90,6 @@ def __init__(self, factor: int, flavor: ScanPartitionFlavor) -> None:
@staticmethod
def from_scan(ir: Scan) -> ScanPartitionPlan:
"""Extract the partitioning plan of a Scan operation."""
plan = ScanPartitionPlan(1, ScanPartitionFlavor.SINGLE_FILE)
if ir.typ == "parquet":
# TODO: Use system info to set default blocksize
parallel_options = ir.config_options.get("executor_options", {})
Expand All @@ -91,19 +99,19 @@ def from_scan(ir: Scan) -> ScanPartitionPlan:
if file_size > 0:
if file_size > blocksize:
# Split large files
plan = ScanPartitionPlan(
return ScanPartitionPlan(
math.ceil(file_size / blocksize),
ScanPartitionFlavor.SPLIT_FILES,
)
else:
# Fuse small files
plan = ScanPartitionPlan(
max(int(blocksize / file_size), 1),
return ScanPartitionPlan(
max(blocksize // int(file_size), 1),
ScanPartitionFlavor.FUSED_FILES,
)

# TODO: Use file sizes for csv and json
return plan
return ScanPartitionPlan(1, ScanPartitionFlavor.SINGLE_FILE)


class SplitScan(IR):
Expand All @@ -123,6 +131,7 @@ class SplitScan(IR):
"total_splits",
)
_non_child = (
"schema",
"base_scan",
"split_index",
"total_splits",
Expand All @@ -134,8 +143,10 @@ class SplitScan(IR):
total_splits: int
"""Total number of splits."""

def __init__(self, base_scan: Scan, split_index: int, total_splits: int):
self.schema = base_scan.schema
def __init__(
self, schema: Schema, base_scan: Scan, split_index: int, total_splits: int
):
self.schema = schema
self.base_scan = base_scan
self.split_index = split_index
self.total_splits = total_splits
Expand All @@ -150,14 +161,6 @@ def __init__(self, base_scan: Scan, split_index: int, total_splits: int):
f"Unhandled Scan type for file splitting: {base_scan.typ}"
)

def get_hashable(self) -> Hashable:
"""Hashable representation of node."""
return (
self.base_scan,
self.split_index,
self.total_splits,
)

@classmethod
def do_evaluate(
cls,
Expand Down Expand Up @@ -290,7 +293,7 @@ def _(
ir.predicate,
)
slices.extend(
SplitScan(base_scan, sindex, plan.factor)
SplitScan(ir.schema, base_scan, sindex, plan.factor)
for sindex in range(plan.factor)
)
new_node = Union(ir.schema, None, *slices)
Expand Down

0 comments on commit ecc8443

Please sign in to comment.