diff --git a/pyproject.toml b/pyproject.toml index c84b873..e60c60b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ - 'nested-pandas==0.1.3', + 'nested-pandas==0.2.1', 'numpy', 'dask>=2024.3.0', 'dask[distributed]>=2024.3.0', diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index 886700c..8d27a32 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -46,6 +46,18 @@ def _rebuild(self, graph, func, args): # type: ignore return collection +def _nested_meta_from_flat(flat, name): + """construct meta for a packed series from a flat dataframe""" + pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes + pyarrow_fields = {} # grab underlying pyarrow dtypes + for field, dtype in pd_fields.items(): + if hasattr(dtype, "pyarrow_dtype"): + pyarrow_fields[field] = dtype.pyarrow_dtype + else: # or convert from numpy types + pyarrow_fields[field] = pa.from_numpy_dtype(dtype) + return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields)) + + class NestedFrame( _Frame, dd.DataFrame ): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) @@ -70,17 +82,6 @@ def __getitem__(self, item): else: return super().__getitem__(item) - def _nested_meta_from_flat(self, flat, name): - """construct meta for a packed series from a flat dataframe""" - pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes - pyarrow_fields = {} # grab underlying pyarrow dtypes - for field, dtype in pd_fields.items(): - if hasattr(dtype, "pyarrow_dtype"): - pyarrow_fields[field] = dtype.pyarrow_dtype - else: # or convert from numpy types - pyarrow_fields[field] = pa.from_numpy_dtype(dtype) - return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields)) - def __setitem__(self, key, value): """Adds custom __setitem__ behavior for nested columns""" @@ -102,8 +103,8 @@ def __setitem__(self, key, value): new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())}) # pack the modified df back into a nested column - meta = self._nested_meta_from_flat(new_flat, nested) - packed = new_flat.map_partitions(lambda x: pack(x), meta=meta) + meta = _nested_meta_from_flat(new_flat, nested) + packed = new_flat.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(nested, packed) # Adding a new nested structure from a column @@ -114,8 +115,8 @@ def __setitem__(self, key, value): value.name = col value = value.to_frame() - meta = self._nested_meta_from_flat(value, new_nested) - packed = value.map_partitions(lambda x: pack(x), meta=meta) + meta = _nested_meta_from_flat(value, new_nested) + packed = value.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta) return super().__setitem__(new_nested, packed) return super().__setitem__(key, value) @@ -280,6 +281,144 @@ def from_map( ) return NestedFrame.from_dask_dataframe(nf) + @classmethod + def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nested"): + """Creates a NestedFrame with base and nested columns from a flat + dataframe. + + Parameters + ---------- + df: dd.DataFrame or nd.NestedFrame + A flat dataframe. + base_columns: list-like + The columns that should be used as base (flat) columns in the + output dataframe. + nested_columns: list-like, or None + The columns that should be packed into a nested column. All columns + in the list will attempt to be packed into a single nested column + with the name provided in `nested_name`. If None, is defined as all + columns not in `base_columns`. + index: str, or None + The name of a column to use as the new index. Typically, the index + should have a unique value per row for base columns, and should + repeat for nested columns. For example, a dataframe with two + columns; a=[1,1,1,2,2,2] and b=[5,10,15,20,25,30] would want an + index like [0,0,0,1,1,1] if a is chosen as a base column. If not + provided the current index will be used. + name: + The name of the output column the `nested_columns` are packed into. + + Returns + ------- + NestedFrame + A NestedFrame with the specified nesting structure. + """ + + # Handle meta + meta = npd.NestedFrame(df[base_columns]._meta) + + if nested_columns is None: + nested_columns = [col for col in df.columns if (col not in base_columns) and col != index] + + if len(nested_columns) > 0: + nested_meta = pack(df[nested_columns]._meta, name) + meta = meta.join(nested_meta) + + return df.map_partitions( + lambda x: npd.NestedFrame.from_flat( + df=x, base_columns=base_columns, nested_columns=nested_columns, index=index, name=name + ), + meta=meta, + ) + + @classmethod + def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"): + """Creates a NestedFrame with base and nested columns from a flat + dataframe. + + Parameters + ---------- + df: dd.DataFrame or nd.NestedFrame + A dataframe with list columns. + base_columns: list-like, or None + Any columns that have non-list values in the input df. These will + simply be kept as identical columns in the result + list_columns: list-like, or None + The list-value columns that should be packed into a nested column. + All columns in the list will attempt to be packed into a single + nested column with the name provided in `nested_name`. All columns + in list_columns must have pyarrow list dtypes, otherwise the + operation will fail. If None, is defined as all columns not in + `base_columns`. + name: + The name of the output column the `nested_columns` are packed into. + + Returns + ------- + NestedFrame + A NestedFrame with the specified nesting structure. + + Note + ---- + As noted above, all columns in `list_columns` must have a pyarrow + ListType dtype. This is needed for proper meta propagation. To convert + a list column to this dtype, you can use this command structure: + `nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))})` + + Where pa.int64 above should be replaced with the correct dtype of the + underlying data accordingly. + + Additionally, it's a known issue in Dask + (https://github.com/dask/dask/issues/10139) that columns with list + values will by default be converted to the string type. This will + interfere with the ability to recast these to pyarrow lists. We + recommend setting the following dask config setting to prevent this: + `dask.config.set({"dataframe.convert-string":False})` + + """ + + # Resolve inputs for meta + if base_columns is None: + if list_columns is None: + # with no inputs, assume all columns are list-valued + list_columns = df.columns + else: + # if list_columns are defined, assume everything else is base + base_columns = [col for col in df.columns if col not in list_columns] + else: + if list_columns is None: + # with defined base_columns, assume everything else is list + list_columns = [col for col in df.columns if col not in base_columns] + + # from_lists should have at least one list column defined + if len(list_columns) == 0: + raise ValueError("No columns were assigned as list columns.") + else: + # reject any list columns that are not pyarrow dtyped + for col in list_columns: + if not hasattr(df[col].dtype, "pyarrow_dtype"): + raise TypeError( + f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. +Refer to the docstring for guidance on dtype requirements and assignment.""" + ) + elif not pa.types.is_list(df[col].dtype.pyarrow_dtype): + raise TypeError( + f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype. +Refer to the docstring for guidance on dtype requirements and assignment.""" + ) + + meta = npd.NestedFrame(df[base_columns]._meta) + + nested_meta = pack(df[list_columns]._meta, name) + meta = meta.join(nested_meta) + + return df.map_partitions( + lambda x: npd.NestedFrame.from_lists( + df=x, base_columns=base_columns, list_columns=list_columns, name=name + ), + meta=meta, + ) + def compute(self, **kwargs): """Compute this Dask collection, returning the underlying dataframe or series.""" return npd.NestedFrame(super().compute(**kwargs)) diff --git a/tests/nested_dask/test_io.py b/tests/nested_dask/test_io.py index c02c051..40baa32 100644 --- a/tests/nested_dask/test_io.py +++ b/tests/nested_dask/test_io.py @@ -1,4 +1,6 @@ import nested_dask as nd +import pandas as pd +import pyarrow as pa def test_read_parquet(test_dataset, tmp_path): @@ -19,6 +21,8 @@ def test_read_parquet(test_dataset, tmp_path): base = nd.read_parquet(test_save_path, calculate_divisions=True) nested = nd.read_parquet(nested_save_path, calculate_divisions=True) + # this is read as a large_string, just make it a string + nested = nested.astype({"band": pd.ArrowDtype(pa.string())}) base = base.add_nested(nested, "nested") # Check the loaded dataset against the original diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index 5dead59..632b449 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -1,12 +1,16 @@ +import dask import dask.dataframe as dd import nested_dask as nd import nested_pandas as npd import numpy as np import pandas as pd +import pyarrow as pa import pytest from nested_dask.datasets import generate_data from nested_pandas.series.dtype import NestedDtype +dask.config.set({"dataframe.convert-string": False}) + def test_nestedframe_construction(test_dataset): """test the construction of a nestedframe""" @@ -108,6 +112,152 @@ def test_add_nested(test_dataset_no_add_nested): assert len(base_with_nested.compute()) == 50 +def test_from_flat(): + """Test the from_flat wrapping, make sure meta is assigned correctly""" + + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "a": [1, 1, 1, 2, 2, 2], + "b": [2, 2, 2, 4, 4, 4], + "c": [1, 2, 3, 4, 5, 6], + "d": [2, 4, 6, 8, 10, 12], + }, + index=[0, 0, 0, 1, 1, 1], + ) + ) + + # Check full inputs + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check omitting a base column + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a"], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["a", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check omitting a nested column + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"], nested_columns=["d"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check no base columns + ndf = nd.NestedFrame.from_flat(nf, base_columns=[], nested_columns=["c", "d"]) + assert list(ndf.columns) == ["nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check inferred nested columns + ndf = nd.NestedFrame.from_flat(nf, base_columns=["a", "b"]) + assert list(ndf.columns) == ["a", "b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + # Check using an index + ndf = nd.NestedFrame.from_flat(nf, base_columns=["b"], index="a") + assert list(ndf.columns) == ["b", "nested"] + assert list(ndf["nested"].nest.fields) == ["c", "d"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + assert len(ndf_comp) == 2 + + +def test_from_lists(): + """Test the from_lists wrapping, make sure meta is assigned correctly""" + + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "c": [1, 2, 3], + "d": [2, 4, 6], + "e": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + "f": [["dog", "cat", "bird"], ["dog", "cat", "bird"], ["dog", "cat", "bird"]], + }, + index=[0, 1, 2], + ) + ) + nf = nf.astype({"e": pd.ArrowDtype(pa.list_(pa.int64())), "f": pd.ArrowDtype(pa.list_(pa.string()))}) + + # Check with just base_columns + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c", "d"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with just list_columns + ndf = nd.NestedFrame.from_lists(nf, list_columns=["e", "f"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with base subset + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c"], list_columns=["e", "f"]) + assert list(ndf.columns) == ["c", "nested"] + assert list(ndf["nested"].nest.fields) == ["e", "f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + # Check with list subset + ndf = nd.NestedFrame.from_lists(nf, base_columns=["c", "d"], list_columns=["f"]) + assert list(ndf.columns) == ["c", "d", "nested"] + assert list(ndf["nested"].nest.fields) == ["f"] + ndf_comp = ndf.compute() + assert list(ndf.columns) == list(ndf_comp.columns) + assert list(ndf["nested"].nest.fields) == list(ndf["nested"].nest.fields) + + +def test_from_lists_errors(): + """test that the dtype errors are appropriately raised""" + nf = nd.NestedFrame.from_pandas( + npd.NestedFrame( + { + "c": [1, 2, 3], + "d": [2, 4, 6], + "e": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], + "f": [["dog", "cat", "bird"], ["dog", "cat", "bird"], ["dog", "cat", "bird"]], + }, + index=[0, 1, 2], + ) + ) + # first check for no list_column error + with pytest.raises(ValueError): + nd.NestedFrame.from_lists(nf, base_columns=["c", "d", "e", "f"]) + + # next check for non-pyarrow dtype in list_column + with pytest.raises(TypeError): + nd.NestedFrame.from_lists(nf, base_columns=["e"]) + + # And check for non-list pyarrow type in list_column + nf = nf.astype({"d": pd.ArrowDtype(pa.int64())}) + with pytest.raises(TypeError): + nd.NestedFrame.from_lists(nf, base_columns=["d"]) + + def test_query_on_base(test_dataset): """test the query function on base columns""" @@ -199,6 +349,8 @@ def test_to_parquet_by_layer(test_dataset, tmp_path): loaded_base = nd.read_parquet(test_save_path / "base", calculate_divisions=True) loaded_nested = nd.read_parquet(test_save_path / "nested", calculate_divisions=True) + # this is read as a large_string, just make it a string + loaded_nested = loaded_nested.astype({"band": pd.ArrowDtype(pa.string())}) loaded_dataset = loaded_base.add_nested(loaded_nested, "nested") # Check for equivalence