diff --git a/benchmarks/benchmarks.py b/benchmarks/benchmarks.py index 71f2616..c07f397 100644 --- a/benchmarks/benchmarks.py +++ b/benchmarks/benchmarks.py @@ -3,14 +3,112 @@ For more information on writing benchmarks: https://asv.readthedocs.io/en/stable/writing_benchmarks.html.""" -from dask_nested import example_benchmarks +import dask_nested as dn +import nested_pandas as npd +import numpy as np +import pandas as pd -def time_computation(): - """Time computations are prefixed with 'time'.""" - example_benchmarks.runtime_computation() +def _generate_benchmark_data(add_nested=True): + """generate a dataset for benchmarks""" + n_base = 100 + layer_size = 1000 -def mem_list(): - """Memory computations are prefixed with 'mem' or 'peakmem'.""" - return example_benchmarks.memory_computation() + # use provided seed, "None" acts as if no seed is provided + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() + + # Convert to Dask + base_nf = dn.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=5) + layer_nf = dn.NestedFrame.from_nested_pandas(layer_nf).repartition(npartitions=50) + + # Return based on add_nested + if add_nested: + base_nf = base_nf.add_nested(layer_nf, "nested") + return base_nf + else: + return base_nf, layer_nf + + +class NestedFrameAddNested: + """Benchmark the NestedFrame.add_nested function""" + + n_base = 100 + layer_size = 1000 + base_nf = dn.NestedFrame + layer_nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.base_nf, self.layer_nf = _generate_benchmark_data(add_nested=False) + + def run(self): + """Run the benchmark.""" + self.base_nf.add_nested(self.layer_nf, "nested").compute() + + def time_run(self): + """Benchmark the runtime of adding a nested layer""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of adding a nested layer""" + self.run() + + +class NestedFrameReduce: + """Benchmark the NestedFrame.reduce function""" + + nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.nf = _generate_benchmark_data(add_nested=True) + + def run(self): + """Run the benchmark.""" + meta = pd.Series(name="mean", dtype=float) + self.nf.reduce(np.mean, "nested.flux", meta=meta).compute() + + def time_run(self): + """Benchmark the runtime of applying the reduce function""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of applying the reduce function""" + self.run() + + +class NestedFrameQuery: + """Benchmark the NestedFrame.query function""" + + nf = dn.NestedFrame + + def setup(self): + """Set up the benchmark environment""" + self.nf = _generate_benchmark_data(add_nested=True) + + def run(self): + """Run the benchmark.""" + + # Apply nested layer query + self.nf = self.nf.query("nested.band == 'g'").compute() + + def time_run(self): + """Benchmark the runtime of applying the two queries""" + self.run() + + def peakmem_run(self): + """Benchmark the memory usage of applying the two queries""" + self.run() diff --git a/pyproject.toml b/pyproject.toml index c5ac0c4..bf8e911 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,12 @@ classifiers = [ dynamic = ["version"] requires-python = ">=3.9" dependencies = [ + 'nested-pandas', + 'numpy', + 'dask>=2024.3.0', + 'dask[distributed]>=2024.3.0', + 'dask_expr', + 'pyarrow', ] [project.urls] diff --git a/src/dask_nested/__init__.py b/src/dask_nested/__init__.py index b564b85..faad669 100644 --- a/src/dask_nested/__init__.py +++ b/src/dask_nested/__init__.py @@ -1,3 +1,4 @@ -from .example_module import greetings, meaning - -__all__ = ["greetings", "meaning"] +from . import backends, accessor # noqa +from .core import NestedFrame # noqa +from .io import read_parquet # noqa +from .datasets import generate_data # noqa diff --git a/src/dask_nested/accessor.py b/src/dask_nested/accessor.py new file mode 100644 index 0000000..ceb8fb9 --- /dev/null +++ b/src/dask_nested/accessor.py @@ -0,0 +1,68 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + +import dask.dataframe as dd +import nested_pandas as npd +from dask.dataframe.extensions import register_series_accessor +from nested_pandas import NestedDtype + + +@register_series_accessor("nest") +class DaskNestSeriesAccessor(npd.NestSeriesAccessor): + """The nested-dask version of the nested-pandas NestSeriesAccessor. + + Note that this has a very limited implementation relative to nested-pandas. + + Parameters + ---------- + series: dd.series + A series to tie to the accessor + """ + + def __init__(self, series): + self._check_series(series) + + self._series = series + + @staticmethod + def _check_series(series): + """chcek the validity of the tied series dtype""" + dtype = series.dtype + if not isinstance(dtype, NestedDtype): + raise AttributeError(f"Can only use .nest accessor with a Series of NestedDtype, got {dtype}") + + @property + def fields(self) -> list[str]: + """Names of the nested columns""" + + return list(self._series.dtype.fields) + + def to_lists(self, fields: list[str] | None = None) -> dd.DataFrame: + """Convert nested series into dataframe of list-array columns + + Parameters + ---------- + fields : list[str] or None, optional + Names of the fields to include. Default is None, which means all fields. + + Returns + ------- + dd.DataFrame + Dataframe of list-arrays. + """ + return self._series.map_partitions(lambda x: x.nest.to_lists(fields=fields)) + + def to_flat(self, fields: list[str] | None = None) -> dd.DataFrame: + """Convert nested series into dataframe of flat arrays + + Parameters + ---------- + fields : list[str] or None, optional + Names of the fields to include. Default is None, which means all fields. + + Returns + ------- + dd.DataFrame + Dataframe of flat arrays. + """ + return self._series.map_partitions(lambda x: x.nest.to_flat(fields=fields)) diff --git a/src/dask_nested/backends.py b/src/dask_nested/backends.py new file mode 100644 index 0000000..bd589f2 --- /dev/null +++ b/src/dask_nested/backends.py @@ -0,0 +1,42 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + +import nested_pandas as npd +import pandas as pd +from dask.dataframe.backends import meta_nonempty_dataframe +from dask.dataframe.dispatch import make_meta_dispatch +from dask.dataframe.extensions import make_array_nonempty +from dask.dataframe.utils import meta_nonempty +from dask_expr import get_collection_type +from nested_pandas.series.ext_array import NestedExtensionArray + +from .core import NestedFrame + +get_collection_type.register(npd.NestedFrame, lambda _: NestedFrame) + +# The following dispatch functions are defined as per the Dask extension guide: +# https://docs.dask.org/en/latest/dataframe-extend.html + + +@make_meta_dispatch.register(npd.NestedFrame) +def make_meta_frame(x, index=None) -> npd.NestedFrame: + """Create an empty NestedFrame to use as Dask's underlying object meta.""" + + dtypes = x.dtypes.to_dict() + result = npd.NestedFrame({key: pd.Series(dtype=d) for key, d in dtypes.items()}) + return result + + +@meta_nonempty.register(npd.NestedFrame) +def _nonempty_nestedframe(x, index=None) -> npd.NestedFrame: + """Construct a new NestedFrame with the same underlying data.""" + df = meta_nonempty_dataframe(x) + return npd.NestedFrame(df) + + +@make_array_nonempty.register(npd.NestedDtype) +def _(dtype) -> NestedExtensionArray: + """Register a valid dtype for the NestedExtensionArray""" + # must be two values to avoid a length error in meta inference + # Dask seems to explicitly require meta dtypes to have length 2. + return NestedExtensionArray._from_sequence([pd.NA, pd.NA], dtype=dtype) diff --git a/src/dask_nested/core.py b/src/dask_nested/core.py new file mode 100644 index 0000000..a93b3da --- /dev/null +++ b/src/dask_nested/core.py @@ -0,0 +1,403 @@ +from __future__ import annotations + +import os + +import dask.dataframe as dd +import dask_expr as dx +import nested_pandas as npd +from dask_expr._collection import new_collection +from nested_pandas.series.dtype import NestedDtype +from nested_pandas.series.packer import pack_flat +from pandas._libs import lib +from pandas._typing import AnyAll, Axis, IndexLabel +from pandas.api.extensions import no_default + +# need this for the base _Frame class +# mypy: disable-error-code="misc" + + +class _Frame(dx.FrameBase): # type: ignore + """Base class for extensions of Dask Dataframes.""" + + _partition_type = npd.NestedFrame + + def __init__(self, expr): + super().__init__(expr) + + @property + def _args(self): + # Ensure our Dask extension can correctly be used by pickle. + # See https://github.com/geopandas/dask-geopandas/issues/237 + return super()._args + + def optimize(self, fuse: bool = True): + result = new_collection(self.expr.optimize(fuse=fuse)) + return result + + def __dask_postpersist__(self): + func, args = super().__dask_postpersist__() + + return self._rebuild, (func, args) + + def _rebuild(self, graph, func, args): # type: ignore + collection = func(graph, *args) + return collection + + +class NestedFrame( + _Frame, dd.DataFrame +): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0) + """An extension for a Dask Dataframe that has Nested-Pandas functionality. + + Examples + -------- + >>> import nested_dask as nd + >>> base = nd.NestedFrame(base_data) + >>> layer = nd.NestedFrame(layer_data) + >>> base.add_nested(layer, "layer") + """ + + _partition_type = npd.NestedFrame # Tracks the underlying data type + + def __getitem__(self, key): + result = super().__getitem__(key) + return result + + @classmethod + def from_nested_pandas( + cls, + data, + npartitions=None, + chunksize=None, + sort=True, + ) -> NestedFrame: + """Returns an Nested-Dask NestedFrame constructed from a Nested-Pandas + NestedFrame. + + Parameters + ---------- + data: `NestedFrame` + Nested-Pandas NestedFrame containing the underlying data + npartitions: `int`, optional + The number of partitions of the index to create. Note that depending on + the size and index of the dataframe, the output may have fewer + partitions than requested. + chunksize: `int`, optional + The desired number of rows per index partition to use. Note that + depending on the size and index of the dataframe, actual partition + sizes may vary. + sort: `bool`, optional + Whether to sort the frame by a default index. + + Returns + ---------- + result: `NestedFrame` + The constructed Dask-Nested NestedFrame object. + """ + result = dd.from_pandas(data, npartitions=npartitions, chunksize=chunksize, sort=sort) + return NestedFrame.from_dask_dataframe(result) + + @classmethod + def from_dask_dataframe(cls, df) -> NestedFrame: + """Converts a Dask Dataframe to a Dask-Nested NestedFrame + + Parameters + ---------- + df: + A Dask Dataframe to convert + + Returns + ------- + `dask_nested.NestedFrame` + """ + return df.map_partitions(npd.NestedFrame) + + def compute(self, **kwargs): + """Compute this Dask collection, returning the underlying dataframe or series.""" + return npd.NestedFrame(super().compute(**kwargs)) + + @property + def all_columns(self) -> dict: + """returns a dictionary of columns for each base/nested dataframe""" + all_columns = {"base": self.columns} + for column in self.columns: + if isinstance(self[column].dtype, NestedDtype): + nest_cols = list(self.dtypes[column].fields.keys()) + all_columns[column] = nest_cols + return all_columns + + @property + def nested_columns(self) -> list: + """retrieves the base column names for all nested dataframes""" + nest_cols = [] + for column in self.columns: + if isinstance(self[column].dtype, NestedDtype): + nest_cols.append(column) + return nest_cols + + def add_nested(self, nested, name, how="outer") -> NestedFrame: # type: ignore[name-defined] # noqa: F821 + """Packs a dataframe into a nested column + + Parameters + ---------- + nested: + A flat dataframe to pack into a nested column + name: + The name given to the nested column + how: {‘left’, ‘right’, ‘outer’, ‘inner’, ‘cross’}, default ‘outer’ + How to handle the operation of the two objects. + + * left: use calling frame’s index (or column if on is specified) + + * right: use other’s index. + + * outer: form union of calling frame’s index (or column if on is + specified) with other’s index, and sort it lexicographically. + + * inner: form intersection of calling frame’s index (or column if + on is specified) with other’s index, preserving the order of the + calling’s one. + + * cross: creates the cartesian product from both frames, preserves + the order of the left keys. + + Returns + ------- + `dask_nested.NestedFrame` + """ + nested = nested.map_partitions(lambda x: pack_flat(x)).rename(name) + return self.join(nested, how=how) + + def query(self, expr) -> Self: # type: ignore # noqa: F821: + """ + Query the columns of a NestedFrame with a boolean expression. Specified + queries can target nested columns in addition to the typical column set + + Docstring copied from nested-pandas query + + Parameters + ---------- + expr : str + The query string to evaluate. + + Access nested columns using `nested_df.nested_col` (where + `nested_df` refers to a particular nested dataframe and + `nested_col` is a column of that nested dataframe). + + You can refer to variables + in the environment by prefixing them with an '@' character like + ``@a + b``. + + You can refer to column names that are not valid Python variable names + by surrounding them in backticks. Thus, column names containing spaces + or punctuations (besides underscores) or starting with digits must be + surrounded by backticks. (For example, a column named "Area (cm^2)" would + be referenced as ```Area (cm^2)```). Column names which are Python keywords + (like "list", "for", "import", etc) cannot be used. + + For example, if one of your columns is called ``a a`` and you want + to sum it with ``b``, your query should be ```a a` + b``. + + Returns + ------- + DataFrame + DataFrame resulting from the provided query expression. + + Notes + ----- + Queries that target a particular nested structure return a dataframe + with rows of that particular nested structure filtered. For example, + querying the NestedFrame "df" with nested structure "my_nested" as + below will return all rows of df, but with mynested filtered by the + condition: + + >>> df.query("mynested.a > 2") + """ + return self.map_partitions(lambda x: x.query(expr), meta=self._meta) + + def dropna( + self, + *, + axis: Axis = 0, + how: AnyAll | lib.NoDefault = no_default, + thresh: int | lib.NoDefault = no_default, + on_nested: bool = False, + subset: IndexLabel | None = None, + inplace: bool = False, + ignore_index: bool = False, + ) -> Self: # type: ignore[name-defined] # noqa: F821: + """ + Remove missing values for one layer of the NestedFrame. + + Parameters + ---------- + axis : {0 or 'index', 1 or 'columns'}, default 0 + Determine if rows or columns which contain missing values are + removed. + + * 0, or 'index' : Drop rows which contain missing values. + * 1, or 'columns' : Drop columns which contain missing value. + + Only a single axis is allowed. + + how : {'any', 'all'}, default 'any' + Determine if row or column is removed from DataFrame, when we have + at least one NA or all NA. + + * 'any' : If any NA values are present, drop that row or column. + * 'all' : If all values are NA, drop that row or column. + thresh : int, optional + Require that many non-NA values. Cannot be combined with how. + on_nested : str or bool, optional + If not False, applies the call to the nested dataframe in the + column with label equal to the provided string. If specified, + the nested dataframe should align with any columns given in + `subset`. + subset : column label or sequence of labels, optional + Labels along other axis to consider, e.g. if you are dropping rows + these would be a list of columns to include. + + Access nested columns using `nested_df.nested_col` (where + `nested_df` refers to a particular nested dataframe and + `nested_col` is a column of that nested dataframe). + inplace : bool, default False + Whether to modify the DataFrame rather than creating a new one. + ignore_index : bool, default ``False`` + If ``True``, the resulting axis will be labeled 0, 1, …, n - 1. + + .. versionadded:: 2.0.0 + + Returns + ------- + DataFrame or None + DataFrame with NA entries dropped from it or None if ``inplace=True``. + + Notes + ----- + Operations that target a particular nested structure return a dataframe + with rows of that particular nested structure affected. + + Values for `on_nested` and `subset` should be consistent in pointing + to a single layer, multi-layer operations are not supported at this + time. + """ + # propagate meta, assumes row-based operation + return self.map_partitions( + lambda x: x.dropna( + axis=axis, + how=how, + thresh=thresh, + on_nested=on_nested, + subset=subset, + inplace=inplace, + ignore_index=ignore_index, + ), + meta=self._meta, + ) + + def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: + """ + Takes a function and applies it to each top-level row of the NestedFrame. + + docstring copied from nested-pandas + + The user may specify which columns the function is applied to, with + columns from the 'base' layer being passsed to the function as + scalars and columns from the nested layers being passed as numpy arrays. + + Parameters + ---------- + func : callable + Function to apply to each nested dataframe. The first arguments to `func` should be which + columns to apply the function to. + args : positional arguments + Positional arguments to pass to the function, the first *args should be the names of the + columns to apply the function to. + meta : dataframe or series-like, optional + The dask meta of the output. + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + `NestedFrame` + `NestedFrame` with the results of the function applied to the columns of the frame. + + Notes + ----- + The recommend return value of func should be a `pd.Series` where the indices are the names of the + output columns in the dataframe returned by `reduce`. Note however that in cases where func + returns a single value there may be a performance benefit to returning the scalar value + rather than a `pd.Series`. + """ + + # apply nested_pandas reduce via map_partitions + return self.map_partitions(lambda x: x.reduce(func, *args, **kwargs), meta=meta) + + def to_parquet(self, path, by_layer=True, **kwargs) -> None: + """Creates parquet file(s) with the data of a NestedFrame, either + as a single parquet file directory where each nested dataset is packed + into its own column or as an individual parquet file directory for each + layer. + + Docstring copied from nested-pandas. + + Note that here we always opt to use the pyarrow engine for writing + parquet files. + + Parameters + ---------- + path : str + The path to the parquet directory to be written. + by_layer : bool, default True + NOTE: by_layer=False will not reliably preserve divisions currently, + be warned when using it that loading from such a dataset will + likely require you to reset and set the index to generate divisions + information. + + If False, writes the entire NestedFrame to a single parquet + directory. + + If True, writes each layer to a separate parquet sub-directory + within the directory specified by path. The filename for each + outputted file will be named after its layer. For example for the + base layer this is always "base". + kwargs : keyword arguments, optional + Keyword arguments to pass to the function. + + Returns + ------- + None + """ + + # code copied from nested-pandas rather than wrapped + # reason being that a map_partitions call is probably not well-behaved here? + + if not by_layer: + # Todo: Investigate this more + # Divisions cannot be generated from a parquet file that stores + # nested information without a reset_index().set_index() loop. It + # seems like this happens at the to_parquet level rather than + # in read_parquet as dropping the nested columns from the dataframe + # to save does enable divisions to be found, but removing the + # nested columns from the set of columns to load does not. + # Divisions are going to be crucial, and so I think it's best to + # not support this until this is resolved. However the non-by_layer + # mode is needed for by_layer so it may be best to just settle for + # changing the default and filing a higher-priority bug. + # raise NotImplementedError + + # We just defer to the pandas to_parquet method if we're not writing by layer + # or there is only one layer in the NestedFrame. + super().to_parquet(path, engine="pyarrow", **kwargs) + else: + # Write the base layer to a parquet file + base_frame = self.drop(columns=self.nested_columns) + base_frame.to_parquet(os.path.join(path, "base"), by_layer=False, **kwargs) + + # Write each nested layer to a parquet file + for layer in self.all_columns: + if layer != "base": + path_layer = os.path.join(path, f"{layer}") + self[layer].nest.to_flat().to_parquet(path_layer, engine="pyarrow", **kwargs) + return None diff --git a/src/dask_nested/datasets/__init__.py b/src/dask_nested/datasets/__init__.py new file mode 100644 index 0000000..4b8e827 --- /dev/null +++ b/src/dask_nested/datasets/__init__.py @@ -0,0 +1 @@ +from .generation import * # noqa diff --git a/src/dask_nested/datasets/generation.py b/src/dask_nested/datasets/generation.py new file mode 100644 index 0000000..d5b296f --- /dev/null +++ b/src/dask_nested/datasets/generation.py @@ -0,0 +1,42 @@ +from nested_pandas import datasets + +import dask_nested as dn + + +def generate_data(n_base, n_layer, npartitions=1, seed=None) -> dn.NestedFrame: + """Generates a toy dataset. + + Docstring copied from nested-pandas. + + Parameters + ---------- + n_base : int + The number of rows to generate for the base layer + n_layer : int, or dict + The number of rows per n_base row to generate for a nested layer. + Alternatively, a dictionary of layer label, layer_size pairs may be + specified to created multiple nested columns with custom sizing. + npartitions: int + The number of partitions to split the data into. + seed : int + A seed to use for random generation of data + + Returns + ------- + NestedFrame + The constructed Dask NestedFrame. + + Examples + -------- + >>> import dask_nested as dn + >>> dn.datasets.generate_data(10,100) + >>> dn.datasets.generate_data(10, {"nested_a": 100, "nested_b": 200}) + """ + + # Use nested-pandas generator + base_nf = datasets.generate_data(n_base, n_layer, seed=seed) + + # Convert to dask-nested + base_nf = dn.NestedFrame.from_nested_pandas(base_nf).repartition(npartitions=npartitions) + + return base_nf diff --git a/src/dask_nested/example_benchmarks.py b/src/dask_nested/example_benchmarks.py deleted file mode 100644 index 5a77b06..0000000 --- a/src/dask_nested/example_benchmarks.py +++ /dev/null @@ -1,14 +0,0 @@ -"""An example module containing simplistic methods under benchmarking.""" - -import random -import time - - -def runtime_computation(): - """Runtime computation consuming between 0 and 5 seconds.""" - time.sleep(random.uniform(0, 5)) - - -def memory_computation(): - """Memory computation for a random list up to 512 samples.""" - return [0] * random.randint(0, 512) diff --git a/src/dask_nested/example_module.py b/src/dask_nested/example_module.py deleted file mode 100644 index f76e837..0000000 --- a/src/dask_nested/example_module.py +++ /dev/null @@ -1,23 +0,0 @@ -"""An example module containing simplistic functions.""" - - -def greetings() -> str: - """A friendly greeting for a future friend. - - Returns - ------- - str - A typical greeting from a software engineer. - """ - return "Hello from LINCC-Frameworks!" - - -def meaning() -> int: - """The meaning of life, the universe, and everything. - - Returns - ------- - int - The meaning of life. - """ - return 42 diff --git a/src/dask_nested/io.py b/src/dask_nested/io.py new file mode 100644 index 0000000..325c450 --- /dev/null +++ b/src/dask_nested/io.py @@ -0,0 +1,238 @@ +# Python 3.9 doesn't support "|" for types +from __future__ import annotations + +import dask.dataframe as dd + +from .core import NestedFrame + + +def read_parquet( + path, + columns=None, + filters=None, + categories=None, + index=None, + storage_options=None, + engine="auto", + use_nullable_dtypes: bool | None = None, + dtype_backend=None, + calculate_divisions=None, + ignore_metadata_file=False, + metadata_task_size=None, + split_row_groups="infer", + blocksize="default", + aggregate_files=None, + parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem=None, + **kwargs, +) -> NestedFrame: + """ + Read a Parquet file into a Dask DataFrame + + This reads a directory of Parquet data into a Dask.dataframe, one file per + partition. It selects the index among the sorted columns if any exist. + + Docstring copied from `dask.dataframe.read_parquet` + + Parameters + ---------- + path : str or list + Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like ``s3://`` to read from alternative + filesystems. To read from multiple files you can pass a globstring or a + list of paths, with the caveat that they must all have the same + protocol. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the pandas parquet + metadata, if present). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + index : str, list or False, default None + Field name(s) to use as the output frame index. By default will be + inferred from the pandas parquet file metadata, if present. Use ``False`` + to read all fields as columns. + categories : list or dict, default None + For any fields listed here, if the parquet encoding is Dictionary, + the column will be created with dtype category. Use only if it is + guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number + of labels expected; if None, will load categories automatically for + data written by dask, not otherwise. + storage_options : dict, default None + Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``filesystem`` argument, described below. + open_file_options : dict, default None + Key/value arguments to be passed along to ``AbstractFileSystem.open`` + when each parquet data file is open for reading. Experimental + (optimized) "precaching" for remote file systems (e.g. S3, GCS) can + be enabled by adding ``{"method": "parquet"}`` under the + ``"precache_options"`` key. Also, a custom file-open function can be + used (instead of ``AbstractFileSystem.open``), by specifying the + desired function under the ``"open_file_func"`` key. + engine : {'auto', 'pyarrow'} + Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if + it is installed, and falls back to the deprecated ``fastparquet`` otherwise. + Note that ``fastparquet`` does not support all functionality offered by + ``pyarrow``. + This is also used by third-party packages (e.g. CuDF) to inject bespoke engines. + use_nullable_dtypes : {False, True} + Whether to use extension dtypes for the resulting ``DataFrame``. + + .. note:: + + This option is deprecated. Use "dtype_backend" instead. + + dtype_backend : {'numpy_nullable', 'pyarrow'}, defaults to NumPy backed DataFrames + Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, + nullable dtypes are used for all dtypes that have a nullable implementation + when 'numpy_nullable' is set, pyarrow is used for all dtypes if 'pyarrow' + is set. + ``dtype_backend="pyarrow"`` requires ``pandas`` 1.5+. + calculate_divisions : bool, default False + Whether to use min/max statistics from the footer metadata (or global + ``_metadata`` file) to calculate divisions for the output DataFrame + collection. Divisions will not be calculated if statistics are missing. + This option will be ignored if ``index`` is not specified and there is + no physical index column specified in the custom "pandas" Parquet + metadata. Note that ``calculate_divisions=True`` may be extremely slow + when no global ``_metadata`` file is present, especially when reading + from remote storage. Set this to ``True`` only when known divisions + are needed for your workload (see :ref:`dataframe-design-partitions`). + ignore_metadata_file : bool, default False + Whether to ignore the global ``_metadata`` file (when one is present). + If ``True``, or if the global ``_metadata`` file is missing, the parquet + metadata may be gathered and processed in parallel. Parallel metadata + processing is currently supported for ``ArrowDatasetEngine`` only. + metadata_task_size : int, default configurable + If parquet metadata is processed in parallel (see ``ignore_metadata_file`` + description above), this argument can be used to specify the number of + dataset files to be processed by each task in the Dask graph. If this + argument is set to ``0``, parallel metadata processing will be disabled. + The default values for local and remote filesystems can be specified + with the "metadata-task-size-local" and "metadata-task-size-remote" + config fields, respectively (see "dataframe.parquet"). + split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer' + If True, then each output dataframe partition will correspond to a single + parquet-file row-group. If False, each partition will correspond to a + complete file. If a positive integer value is given, each dataframe + partition will correspond to that number of parquet row-groups (or fewer). + If 'adaptive', the metadata of each file will be used to ensure that every + partition satisfies ``blocksize``. If 'infer' (the default), the + uncompressed storage-size metadata in the first file will be used to + automatically set ``split_row_groups`` to either 'adaptive' or ``False``. + blocksize : int or str, default 'default' + The desired size of each output ``DataFrame`` partition in terms of total + (uncompressed) parquet storage space. This argument is currently used to + set the default value of ``split_row_groups`` (using row-group metadata + from a single file), and will be ignored if ``split_row_groups`` is not + set to 'infer' or 'adaptive'. Default is 256 MiB. + aggregate_files : bool or str, default None + WARNING: Passing a string argument to ``aggregate_files`` will result + in experimental behavior. This behavior may change in the future. + + Whether distinct file paths may be aggregated into the same output + partition. This parameter is only used when `split_row_groups` is set to + 'infer', 'adaptive' or to an integer >1. A setting of True means that any + two file paths may be aggregated into the same output partition, while + False means that inter-file aggregation is prohibited. + + For "hive-partitioned" datasets, a "partition"-column name can also be + specified. In this case, we allow the aggregation of any two files + sharing a file path up to, and including, the corresponding directory name. + For example, if ``aggregate_files`` is set to ``"section"`` for the + directory structure below, ``03.parquet`` and ``04.parquet`` may be + aggregated together, but ``01.parquet`` and ``02.parquet`` cannot be. + If, however, ``aggregate_files`` is set to ``"region"``, ``01.parquet`` + may be aggregated with ``02.parquet``, and ``03.parquet`` may be aggregated + with ``04.parquet``:: + + dataset-path/ + ├── region=1/ + │ ├── section=a/ + │ │ └── 01.parquet + │ ├── section=b/ + │ └── └── 02.parquet + └── region=2/ + ├── section=a/ + │ ├── 03.parquet + └── └── 04.parquet + + Note that the default behavior of ``aggregate_files`` is ``False``. + parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq") + A file extension or an iterable of extensions to use when discovering + parquet files in a directory. Files that don't match these extensions + will be ignored. This argument only applies when ``paths`` corresponds + to a directory and no ``_metadata`` file is present (or + ``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None`` + will treat all files in the directory as parquet files. + + The purpose of this argument is to ensure that the engine will ignore + unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). + It may be necessary to change this argument if the data files in your + parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. + Specifies the backend to use. + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. + These options may include a "filesystem" key to configure the desired + file-system backend. However, the top-level ``filesystem`` argument will always + take precedence. + + **Note**: The ``dataset`` options may include a "partitioning" key. + However, since ``pyarrow.dataset.Partitioning`` + objects cannot be serialized, the value can be a dict of key-word + arguments for the ``pyarrow.dataset.partitioning`` API + (e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``). + Note that partitioned columns will not be converted to categorical + dtypes when a custom partitioning schema is specified in this way. + read: dict, default None + Dictionary of options to pass through to ``engine.read_partitions`` + using the ``read`` key-word argument. + arrow_to_pandas: dict, default None + Dictionary of options to use when converting from ``pyarrow.Table`` to + a pandas ``DataFrame`` object. Only used by the "arrow" engine. + **kwargs: dict (of dicts) + Options to pass through to ``engine.read_partitions`` as stand-alone + key-word arguments. Note that these options will be ignored by the + engines defined in ``dask.dataframe``, but may be used by other custom + implementations. + + Examples + -------- + >>> df = dd.read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP + """ + return NestedFrame.from_dask_dataframe( + dd.read_parquet( + path=path, + columns=columns, + filters=filters, + categories=categories, + index=index, + storage_options=storage_options, + engine=engine, + use_nullable_dtypes=use_nullable_dtypes, + dtype_backend=dtype_backend, + calculate_divisions=calculate_divisions, + ignore_metadata_file=ignore_metadata_file, + metadata_task_size=metadata_task_size, + split_row_groups=split_row_groups, + blocksize=blocksize, + aggregate_files=aggregate_files, + parquet_file_extension=parquet_file_extension, + filesystem=filesystem, + **kwargs, + ) + ) diff --git a/tests/dask_nested/conftest.py b/tests/dask_nested/conftest.py index e69de29..30b2196 100644 --- a/tests/dask_nested/conftest.py +++ b/tests/dask_nested/conftest.py @@ -0,0 +1,84 @@ +import dask_nested as dn +import nested_pandas as npd +import numpy as np +import pytest + + +@pytest.fixture +def test_dataset(): + """create a toy dataset for testing purposes""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index").sort_index() + + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + + return base_dn.add_nested(layer_dn, "nested") + + +@pytest.fixture +def test_dataset_with_nans(): + """stop before add_nested""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + a = randomstate.random(n_base) + a[10] = np.nan # add a nan + base_data = {"a": a, "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + t = randomstate.random(layer_size * n_base) * 20 + t[50] = np.nan # add a nan + + layer_data = { + "t": t, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + + return base_dn.add_nested(layer_dn, "nested") + + +@pytest.fixture +def test_dataset_no_add_nested(): + """stop before add_nested""" + n_base = 50 + layer_size = 500 + randomstate = np.random.RandomState(seed=1) + + # Generate base data + base_data = {"a": randomstate.random(n_base), "b": randomstate.random(n_base) * 2} + base_nf = npd.NestedFrame(data=base_data) + + layer_data = { + "t": randomstate.random(layer_size * n_base) * 20, + "flux": randomstate.random(layer_size * n_base) * 100, + "band": randomstate.choice(["r", "g"], size=layer_size * n_base), + "index": np.arange(layer_size * n_base) % n_base, + } + layer_nf = npd.NestedFrame(data=layer_data).set_index("index") + + base_dn = dn.NestedFrame.from_nested_pandas(base_nf, npartitions=5) + layer_dn = dn.NestedFrame.from_nested_pandas(layer_nf, npartitions=10) + + return (base_dn, layer_dn) diff --git a/tests/dask_nested/test_accessor.py b/tests/dask_nested/test_accessor.py new file mode 100644 index 0000000..8cc7603 --- /dev/null +++ b/tests/dask_nested/test_accessor.py @@ -0,0 +1,93 @@ +import pandas as pd +import pyarrow as pa +import pytest + + +def test_nest_accessor(test_dataset): + """test that the nest accessor is correctly tied to columns""" + + # Make sure that nested columns have the accessor available + assert hasattr(test_dataset.nested, "nest") + + # Make sure we get an attribute error when trying to use the wrong column + with pytest.raises(AttributeError): + test_dataset.ra.nest # noqa + + +def test_fields(test_dataset): + """test the fields accessor property""" + assert test_dataset.nested.nest.fields == ["t", "flux", "band"] + + +def test_to_flat(test_dataset): + """test the to_flat function""" + flat_ztf = test_dataset.nested.nest.to_flat() + + # check dtypes + assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["band"] == pd.ArrowDtype(pa.large_string()) + + # Make sure we retain all rows + assert len(flat_ztf.loc[1]) == 500 + + one_row = flat_ztf.loc[1].compute().iloc[1] + assert pytest.approx(one_row["t"], 0.01) == 5.4584 + assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + assert one_row["band"] == "r" + + +def test_to_flat_with_fields(test_dataset): + """test the to_flat function""" + flat_ztf = test_dataset.nested.nest.to_flat(fields=["t", "flux"]) + + # check dtypes + assert flat_ztf.dtypes["t"] == pd.ArrowDtype(pa.float64()) + assert flat_ztf.dtypes["flux"] == pd.ArrowDtype(pa.float64()) + + # Make sure we retain all rows + assert len(flat_ztf.loc[1]) == 500 + + one_row = flat_ztf.loc[1].compute().iloc[1] + assert pytest.approx(one_row["t"], 0.01) == 5.4584 + assert pytest.approx(one_row["flux"], 0.01) == 84.1573 + + +def test_to_lists(test_dataset): + """test the to_lists function""" + list_ztf = test_dataset.nested.nest.to_lists() + + # check dtypes + assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["band"] == pd.ArrowDtype(pa.list_(pa.large_string())) + + # Make sure we have a single row for an id + assert len(list_ztf.loc[1]) == 1 + + # Make sure we retain all rows -- double loc for speed and pandas get_item + assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + + # spot-check values + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 + assert list_ztf.loc[1].compute().loc[1]["band"][0] == "g" + + +def test_to_lists_with_fields(test_dataset): + """test the to_lists function""" + list_ztf = test_dataset.nested.nest.to_lists(fields=["t", "flux"]) + + # check dtypes + assert list_ztf.dtypes["t"] == pd.ArrowDtype(pa.list_(pa.float64())) + assert list_ztf.dtypes["flux"] == pd.ArrowDtype(pa.list_(pa.float64())) + + # Make sure we have a single row for an id + assert len(list_ztf.loc[1]) == 1 + + # Make sure we retain all rows -- double loc for speed and pandas get_item + assert len(list_ztf.loc[1].compute().loc[1]["t"]) == 500 + + # spot-check values + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["t"][0], 0.01) == 7.5690279 + assert pytest.approx(list_ztf.loc[1].compute().loc[1]["flux"][0], 0.01) == 79.6886 diff --git a/tests/dask_nested/test_datasets.py b/tests/dask_nested/test_datasets.py new file mode 100644 index 0000000..b174d27 --- /dev/null +++ b/tests/dask_nested/test_datasets.py @@ -0,0 +1,20 @@ +import dask_nested as dn + + +def test_generate_data(): + """test the dataset generator function""" + + # test the seed + generate_1 = dn.datasets.generate_data(10, 100, npartitions=2, seed=1) + generate_2 = dn.datasets.generate_data(10, 100, npartitions=2, seed=1) + generate_3 = dn.datasets.generate_data(10, 100, npartitions=2, seed=2) + + assert generate_1.compute().equals(generate_2.compute()) + assert not generate_1.compute().equals(generate_3.compute()) + + # test npartitions + assert generate_1.npartitions == 2 + + # test the length + assert len(generate_1) == 10 + assert len(generate_1.nested.nest.to_flat()) == 1000 diff --git a/tests/dask_nested/test_example_module.py b/tests/dask_nested/test_example_module.py deleted file mode 100644 index 6c5f230..0000000 --- a/tests/dask_nested/test_example_module.py +++ /dev/null @@ -1,13 +0,0 @@ -from dask_nested import example_module - - -def test_greetings() -> None: - """Verify the output of the `greetings` function""" - output = example_module.greetings() - assert output == "Hello from LINCC-Frameworks!" - - -def test_meaning() -> None: - """Verify the output of the `meaning` function""" - output = example_module.meaning() - assert output == 42 diff --git a/tests/dask_nested/test_io.py b/tests/dask_nested/test_io.py new file mode 100644 index 0000000..d7f11ec --- /dev/null +++ b/tests/dask_nested/test_io.py @@ -0,0 +1,31 @@ +import dask_nested as dn + + +def test_read_parquet(test_dataset, tmp_path): + """test the reproducibility of read_parquet""" + + # Setup a temporary directory for files + nested_save_path = tmp_path / "nested" + test_save_path = tmp_path / "test_dataset" + + # Save Nested to Parquet + flat_nested = test_dataset.nested.nest.to_flat() + flat_nested.to_parquet(nested_save_path, write_index=True) + + # Save Base to Parquet + test_dataset[["a", "b"]].to_parquet(test_save_path, write_index=True) + + # Now read + base = dn.read_parquet(test_save_path, calculate_divisions=True) + nested = dn.read_parquet(nested_save_path, calculate_divisions=True) + + base = base.add_nested(nested, "nested") + + # Check the loaded dataset against the original + assert base.divisions == test_dataset.divisions # equal divisions + assert base.compute().equals(test_dataset.compute()) # equal data + + # Check the flat nested datasets + base_nested_flat = base.nested.nest.to_flat().compute() + test_nested_flat = base.nested.nest.to_flat().compute() + assert base_nested_flat.equals(test_nested_flat) diff --git a/tests/dask_nested/test_nestedframe.py b/tests/dask_nested/test_nestedframe.py new file mode 100644 index 0000000..5dfed86 --- /dev/null +++ b/tests/dask_nested/test_nestedframe.py @@ -0,0 +1,145 @@ +import dask_nested as dn +import numpy as np +import pytest +from nested_pandas.series.dtype import NestedDtype + + +def test_nestedframe_construction(test_dataset): + """test the construction of a nestedframe""" + assert len(test_dataset) == 50 + assert test_dataset.columns.to_list() == ["a", "b", "nested"] + assert isinstance(test_dataset["nested"].dtype, NestedDtype) + + +def test_all_columns(test_dataset): + """all_columns property test""" + all_cols = test_dataset.all_columns + + assert all_cols["base"].to_list() == test_dataset.columns.to_list() + assert all_cols["nested"] == ["t", "flux", "band"] + + +def test_nested_columns(test_dataset): + """nested_columns property test""" + assert test_dataset.nested_columns == ["nested"] + + +def test_add_nested(test_dataset_no_add_nested): + """test the add_nested function""" + base, layer = test_dataset_no_add_nested + + base_with_nested = base.add_nested(layer, "nested") + + # Check that the result is a nestedframe + assert isinstance(base_with_nested, dn.NestedFrame) + + # Check that there's a new nested column with the correct dtype + assert "nested" in base_with_nested.columns + assert isinstance(base_with_nested.dtypes["nested"], NestedDtype) + + # Check that the nested partitions were used + assert base_with_nested.npartitions == 10 + + assert len(base_with_nested.compute()) == 50 + + +def test_query_on_base(test_dataset): + """test the query function on base columns""" + + # Try a few basic queries + assert len(test_dataset.query("a > 0.5").compute()) == 22 + assert len(test_dataset.query("a > 0.5 & b > 1").compute()) == 13 + assert len(test_dataset.query("a > 2").compute()) == 0 + + +def test_query_on_nested(test_dataset): + """test the query function on nested columns""" + + # Try a few nested queries + res = test_dataset.query("nested.flux>75").compute() + assert len(res.loc[1]["nested"]) == 127 + + res = test_dataset.query("nested.band == 'g'").compute() + + assert len(res.loc[1]["nested"]) == 232 + assert len(res) == 50 # make sure the base df remains unchanged + + +def test_dropna(test_dataset_with_nans): + """test the dropna function""" + + nan_free_base = test_dataset_with_nans.dropna(subset=["a"]) + # should just remove one row + assert len(nan_free_base) == len(test_dataset_with_nans) - 1 + + meta = test_dataset_with_nans.loc[0].head(0).nested.nest.to_flat() + + nan_free_nested = test_dataset_with_nans.dropna(subset=["nested.t"]) + + flat_nested_nan_free = nan_free_nested.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) + flat_nested = test_dataset_with_nans.map_partitions(lambda x: x.nested.nest.to_flat(), meta=meta) + # should just remove one row + assert len(flat_nested_nan_free) == len(flat_nested) - 1 + + +def test_reduce(test_dataset): + """test the reduce function""" + + def reflect_inputs(*args): + return args + + res = test_dataset.reduce(reflect_inputs, "a", "nested.t", meta=("inputs", float)) + + assert len(res) == 50 + assert isinstance(res.compute().loc[0][0], float) + assert isinstance(res.compute().loc[0][1], np.ndarray) + + res2 = test_dataset.reduce(np.mean, "nested.flux", meta=("mean", float)) + + assert pytest.approx(res2.compute()[15], 0.1) == 53.635174 + assert pytest.approx(sum(res2.compute()), 0.1) == 2488.960119 + + +def test_to_parquet_combined(test_dataset, tmp_path): + """test to_parquet when saving all layers to a single directory""" + + test_save_path = tmp_path / "test_dataset" + + # send to parquet + test_dataset.to_parquet(test_save_path, by_layer=False) + + # load back from parquet + loaded_dataset = dn.read_parquet(test_save_path, calculate_divisions=True) + # todo: file bug for this and investigate + loaded_dataset = loaded_dataset.reset_index().set_index("index") + + # Check for equivalence + assert test_dataset.divisions == loaded_dataset.divisions + + test_dataset = test_dataset.compute() + loaded_dataset = loaded_dataset.compute() + + assert test_dataset.equals(loaded_dataset) + + +def test_to_parquet_by_layer(test_dataset, tmp_path): + """test to_parquet when saving layers to subdirectories""" + + test_save_path = tmp_path / "test_dataset" + + # send to parquet + test_dataset.to_parquet(test_save_path, by_layer=True, write_index=True) + + # load back from parquet + loaded_base = dn.read_parquet(test_save_path / "base", calculate_divisions=True) + loaded_nested = dn.read_parquet(test_save_path / "nested", calculate_divisions=True) + + loaded_dataset = loaded_base.add_nested(loaded_nested, "nested") + + # Check for equivalence + assert test_dataset.divisions == loaded_dataset.divisions + + test_dataset = test_dataset.compute() + loaded_dataset = loaded_dataset.compute() + + assert test_dataset.equals(loaded_dataset)