diff --git a/docs/api/mod.rst b/docs/api/mod.rst index 6abb48b..3ead6d9 100644 --- a/docs/api/mod.rst +++ b/docs/api/mod.rst @@ -10,7 +10,6 @@ Most useful functions and classes are contained in submodules. errors io pandas - parquet record isd diff --git a/docs/api/parquet.rst b/docs/api/parquet.rst deleted file mode 100644 index c47be94..0000000 --- a/docs/api/parquet.rst +++ /dev/null @@ -1,5 +0,0 @@ -isd.parquet -=========== - -.. automodule:: isd.parquet - :members: diff --git a/examples/examine_parquet_metadata.py b/examples/examine_parquet_metadata.py deleted file mode 100644 index 597af0b..0000000 --- a/examples/examine_parquet_metadata.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Given a list of one or more parquet files, examine the metadata that would be produced by writing them to parquet using this library. - -The data in the provided files is repartioned by a default interval. -""" - -import os.path -import sys -from tempfile import TemporaryDirectory -from typing import Any, List - -import pandas -import pyarrow.parquet -from pyarrow import Table - -import isd.io - -paths = sys.argv[1:] -data_frames = [isd.io.read_to_data_frame(path) for path in paths] -data_frame = pandas.concat(data_frames) -data_frame = data_frame.set_index(["timestamp", "usaf_id", "ncei_id"]) -table = Table.from_pandas(data_frame) - -metadata_collector: List[Any] = [] -with TemporaryDirectory() as temporary_directory: - directory = os.path.join(temporary_directory, "isd") - pyarrow.parquet.write_to_dataset( - table, directory, metadata_collector=metadata_collector - ) - -print(metadata_collector) diff --git a/examples/read_parquet.py b/examples/read_parquet.py deleted file mode 100644 index 1779b12..0000000 --- a/examples/read_parquet.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Reads an ISD parquet table and prints it to stdout.""" - -import sys - -import isd.parquet - -directory = sys.argv[1] -data_frame = isd.parquet.read(directory).compute() -print(data_frame) diff --git a/setup.cfg b/setup.cfg index 4310e25..55d639d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,9 +20,7 @@ packages = find: python_requires = >=3.7 install_requires = click ~= 8.0 - dask[distributed] ~= 2021.08 pandas ~= 1.3 - pyarrow ~= 6.0 [options.packages.find] diff --git a/src/isd/cli.py b/src/isd/cli.py index c256aa2..533fd04 100644 --- a/src/isd/cli.py +++ b/src/isd/cli.py @@ -3,15 +3,11 @@ import dataclasses import itertools import json -import os -from typing import List, Optional import click from click import ClickException -from dask.distributed import Client import isd.io -import isd.parquet @click.group() @@ -30,23 +26,3 @@ def record(infile: str, index: int) -> None: print(json.dumps(dataclasses.asdict(record), indent=4)) else: raise ClickException(f"No record with index {index}") - - -@main.command() -@click.argument("INFILES", nargs=-1) -@click.argument("DIRECTORY") -@click.option("-c", "--client") -def to_parquet( - infiles: List[str], directory: str, client: Optional[str] = None -) -> None: - """Writes one or more ISD files to a parquet table.""" - paths = [] - for infile in infiles: - if os.path.isdir(infile): - for file_name in os.listdir(infile): - paths.append(os.path.abspath(os.path.join(infile, file_name))) - else: - paths.append(os.path.abspath(infile)) - if client: - Client(client) - isd.parquet.write(paths, directory) diff --git a/src/isd/parquet.py b/src/isd/parquet.py deleted file mode 100644 index d42f0f3..0000000 --- a/src/isd/parquet.py +++ /dev/null @@ -1,40 +0,0 @@ -from typing import List - -import dask.dataframe -import dask.delayed - -import isd.io - -DEFAULT_REPARTITION_FREQUENCY = "2w" - - -def write( - paths: List[str], - directory: str, - append: bool = False, - repartition_frequency: str = DEFAULT_REPARTITION_FREQUENCY, -) -> None: - """Writes the ISD data located in `paths` to a partitioned parquet table at `directory`. - - Uses dask under the hood to read the data and write the output file. - """ - if append: - data_frame = read(directory) - last_division = data_frame.divisions[-1] - data_frames = [ - dask.delayed(isd.io.read_to_data_frame)(path, since=last_division) - for path in paths - ] - else: - data_frames = [dask.delayed(isd.io.read_to_data_frame)(path) for path in paths] - data_frame = dask.dataframe.from_delayed(data_frames) - data_frame = data_frame.persist() - data_frame = data_frame.set_index("timestamp").repartition( - freq=repartition_frequency - ) - data_frame.to_parquet(directory, append=append) - - -def read(directory: str) -> dask.dataframe.DataFrame: - """Reads a parquet table as a dask data frame.""" - return dask.dataframe.read_parquet(directory) diff --git a/tests/test_parquet.py b/tests/test_parquet.py deleted file mode 100644 index bbf0c11..0000000 --- a/tests/test_parquet.py +++ /dev/null @@ -1,21 +0,0 @@ -import os.path -from tempfile import TemporaryDirectory -from typing import List - -import isd.parquet - - -def test_write_then_read(paths: List[str]) -> None: - with TemporaryDirectory() as temporary_directory: - directory = os.path.join(temporary_directory, "isd") - isd.parquet.write(paths, directory) - isd.parquet.read(directory).compute() - - -def test_append(half_path: str, uncompressed_path: str) -> None: - with TemporaryDirectory() as temporary_directory: - directory = os.path.join(temporary_directory, "isd") - isd.parquet.write([half_path], directory) - isd.parquet.write([uncompressed_path], directory, append=True) - data_frame = isd.parquet.read(directory).compute() - assert len(data_frame) == 500