Skip to content

Commit

Permalink
refactor(python)!: simplify marshalling of Fragment, DataFile, `O…
Browse files Browse the repository at this point in the history
…peration`, `Transaction` (#3240)

BREAKING CHANGE: `DataFile.deletion_file` is now a property, not a
method.

For `Fragment` and `Operation`, we had a sort of intermediate `inner`
layer to handle translating between Rust struct and Python objects. This
worked fine in isolation, but once you need to convert at the top of a
hierarchy it became tedious. This was the case for `Transaction`.
`Transaction` contained an `Operation`, which could contain many
`Fragment`s, which contains many `DataFile`s.

These structures are primarily data holders, so they've been made into
`dataclasses`. A newtype wrapper struct `PyLance<T>` is used to provide
implementations of `FromPyObject` and `ToPyObject`. This makes
signatures more readable, and makes the wrappers thinner. For example,
instead of a special Python `FragmentMetadata` struct, we just have a
`PyLance<Fragment>`, where `Fragment` is from the Rust crate.
  • Loading branch information
wjones127 authored Dec 20, 2024
1 parent 72ae355 commit 10e6454
Show file tree
Hide file tree
Showing 13 changed files with 797 additions and 593 deletions.
81 changes: 11 additions & 70 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import time
import uuid
import warnings
from abc import ABC, abstractmethod
from abc import ABC
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -49,9 +49,6 @@
CleanupStats,
_Dataset,
_MergeInsertBuilder,
_Operation,
_RewriteGroup,
_RewrittenIndex,
_Scanner,
_write_dataset,
)
Expand Down Expand Up @@ -107,7 +104,9 @@ def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):

# These next three overrides exist only to document the methods

def when_matched_update_all(self, condition: Optional[str] = None):
def when_matched_update_all(
self, condition: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to update matched rows
Expand All @@ -128,7 +127,7 @@ def when_matched_update_all(self, condition: Optional[str] = None):
"""
return super(MergeInsertBuilder, self).when_matched_update_all(condition)

def when_not_matched_insert_all(self):
def when_not_matched_insert_all(self) -> "MergeInsertBuilder":
"""
Configure the operation to insert not matched rows
Expand All @@ -138,7 +137,9 @@ def when_not_matched_insert_all(self):
"""
return super(MergeInsertBuilder, self).when_not_matched_insert_all()

def when_not_matched_by_source_delete(self, expr: Optional[str] = None):
def when_not_matched_by_source_delete(
self, expr: Optional[str] = None
) -> "MergeInsertBuilder":
"""
Configure the operation to delete source rows that do not match
Expand Down Expand Up @@ -2314,7 +2315,7 @@ def commit(

new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
operation,
read_version,
commit_lock,
storage_options=storage_options,
Expand Down Expand Up @@ -2413,19 +2414,6 @@ def commit_batch(
detached=detached,
max_retries=max_retries,
)
merged = Transaction(**merged)
# This logic is specific to append, which is all that should
# be returned here.
# TODO: generalize this to all other transaction types.
merged.operation["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.operation["fragments"]
]
merged.operation = LanceOperation.Append(**merged.operation)
if merged.blobs_op:
merged.blobs_op["fragments"] = [
FragmentMetadata.from_metadata(f) for f in merged.blobs_op["fragments"]
]
merged.blobs_op = LanceOperation.Append(**merged.blobs_op)
ds = LanceDataset.__new__(LanceDataset)
ds._ds = new_ds
ds._uri = new_ds.uri
Expand Down Expand Up @@ -2511,10 +2499,6 @@ class BaseOperation(ABC):
See available operations under :class:`LanceOperation`.
"""

@abstractmethod
def _to_inner(self):
raise NotImplementedError()

@dataclass
class Overwrite(BaseOperation):
"""
Expand Down Expand Up @@ -2558,7 +2542,7 @@ class Overwrite(BaseOperation):
3 4 d
"""

new_schema: pa.Schema
new_schema: LanceSchema | pa.Schema
fragments: Iterable[FragmentMetadata]

def __post_init__(self):
Expand All @@ -2568,10 +2552,6 @@ def __post_init__(self):
)
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.overwrite(self.new_schema, raw_fragments)

@dataclass
class Append(BaseOperation):
"""
Expand Down Expand Up @@ -2618,10 +2598,6 @@ class Append(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
return _Operation.append(raw_fragments)

@dataclass
class Delete(BaseOperation):
"""
Expand Down Expand Up @@ -2690,12 +2666,6 @@ class Delete(BaseOperation):
def __post_init__(self):
LanceOperation._validate_fragments(self.updated_fragments)

def _to_inner(self):
raw_updated_fragments = [f._metadata for f in self.updated_fragments]
return _Operation.delete(
raw_updated_fragments, self.deleted_fragment_ids, self.predicate
)

@dataclass
class Merge(BaseOperation):
"""
Expand Down Expand Up @@ -2756,18 +2726,14 @@ class Merge(BaseOperation):
schema: LanceSchema | pa.Schema

def __post_init__(self):
LanceOperation._validate_fragments(self.fragments)

def _to_inner(self):
raw_fragments = [f._metadata for f in self.fragments]
if isinstance(self.schema, pa.Schema):
warnings.warn(
"Passing a pyarrow.Schema to Merge is deprecated. "
"Please use a LanceSchema instead.",
DeprecationWarning,
)
self.schema = LanceSchema.from_pyarrow(self.schema)
return _Operation.merge(raw_fragments, self.schema)
LanceOperation._validate_fragments(self.fragments)

@dataclass
class Restore(BaseOperation):
Expand All @@ -2777,9 +2743,6 @@ class Restore(BaseOperation):

version: int

def _to_inner(self):
return _Operation.restore(self.version)

@dataclass
class RewriteGroup:
"""
Expand All @@ -2789,11 +2752,6 @@ class RewriteGroup:
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]

def _to_inner(self):
old_fragments = [f._metadata for f in self.old_fragments]
new_fragments = [f._metadata for f in self.new_fragments]
return _RewriteGroup(old_fragments, new_fragments)

@dataclass
class RewrittenIndex:
"""
Expand All @@ -2803,9 +2761,6 @@ class RewrittenIndex:
old_id: str
new_id: str

def _to_inner(self):
return _RewrittenIndex(self.old_id, self.new_id)

@dataclass
class Rewrite(BaseOperation):
"""
Expand All @@ -2832,11 +2787,6 @@ def __post_init__(self):
all_frags += [new for group in self.groups for new in group.new_fragments]
LanceOperation._validate_fragments(all_frags)

def _to_inner(self):
groups = [group._to_inner() for group in self.groups]
rewritten_indices = [index._to_inner() for index in self.rewritten_indices]
return _Operation.rewrite(groups, rewritten_indices)

@dataclass
class CreateIndex(BaseOperation):
"""
Expand All @@ -2849,15 +2799,6 @@ class CreateIndex(BaseOperation):
dataset_version: int
fragment_ids: Set[int]

def _to_inner(self):
return _Operation.create_index(
self.uuid,
self.name,
self.fields,
self.dataset_version,
self.fragment_ids,
)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
Loading

0 comments on commit 10e6454

Please sign in to comment.