Skip to content

Commit

Permalink
address code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jan 27, 2025
1 parent 86fad9d commit 9624396
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions python/cudf_polars/cudf_polars/experimental/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import json
import operator
from functools import reduce
from typing import TYPE_CHECKING, Any

import pyarrow as pa
Expand Down Expand Up @@ -167,25 +166,20 @@ def _(
# be handled separately.
from cudf_polars.experimental.parallel import PartitionInfo

# Extract child partitioning
children, _partition_info = zip(*(rec(c) for c in ir.children), strict=True)
partition_info = reduce(operator.or_, _partition_info)
pi = partition_info[children[0]]

# Check if we are already shuffled or update partition_info
if ir.keys == pi.partitioned_on:
# Already shuffled!
new_node = children[0]
else:
new_node = ir.reconstruct(children)
partition_info[new_node] = PartitionInfo(
# Default shuffle preserves partition count
count=pi.count,
# Add partitioned_on info
partitioned_on=ir.keys,
)

return new_node, partition_info
(child,) = ir.children

new_child, pi = rec(child)
if ir.keys == pi[new_child].partitioned_on:
# Already shuffled
return new_child, pi
new_node = ir.reconstruct([new_child])
pi[new_node] = PartitionInfo(
# Default shuffle preserves partition count
count=pi[new_child].count,
# Add partitioned_on info
partitioned_on=ir.keys,
)
return new_node, pi


@generate_ir_tasks.register(Shuffle)
Expand Down

0 comments on commit 9624396

Please sign in to comment.