Skip to content

Commit

Permalink
Merge branch 'main' into enh-fuzzydedup-nofpcheck-default
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushdg authored Nov 22, 2024
2 parents 89baf93 + 9173db3 commit c7fd294
Show file tree
Hide file tree
Showing 34 changed files with 1,054 additions and 306 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ on:

jobs:
release:
uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/[email protected].0
uses: NVIDIA/NeMo-FW-CI-templates/.github/workflows/[email protected].3
with:
release-ref: ${{ inputs.release-ref }}
image-name: nemo_curator_container
Expand All @@ -41,3 +41,4 @@ jobs:
TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }}
TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }}
SLACK_RELEASE_ENDPOINT: ${{ secrets.SLACK_RELEASE_ENDPOINT }}
PAT: ${{ secrets.PAT }}
27 changes: 20 additions & 7 deletions nemo_curator/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
import sys

import dask
from packaging.version import parse as parseVersion
from packaging.version import parse as parse_version

try:
_dask_version = parseVersion(dask.__version__)
_dask_version = parse_version(dask.__version__)
except TypeError:
# When mocking with autodoc the dask version is not there
_dask_version = parseVersion("2024.06.0")
_dask_version = parse_version("2024.06.0")

try:
import cudf

CURRENT_CUDF_VERSION = parse_version(cudf.__version__)
except (ImportError, TypeError):
CURRENT_CUDF_VERSION = parse_version("24.10.0")

# TODO remove this once 24.12.0 becomes the base version of cudf in nemo-curator
MINHASH_PERMUTED_AVAILABLE = CURRENT_CUDF_VERSION >= parse_version("24.12.0") or (
CURRENT_CUDF_VERSION.is_prerelease
and CURRENT_CUDF_VERSION.base_version >= "24.12.0"
)

# TODO: remove when dask min version gets bumped
DASK_SHUFFLE_METHOD_ARG = _dask_version > parseVersion("2024.1.0")
DASK_P2P_ERROR = _dask_version < parseVersion("2023.10.0")
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parseVersion("2023.12.0")
DASK_SHUFFLE_METHOD_ARG = _dask_version > parse_version("2024.1.0")
DASK_P2P_ERROR = _dask_version < parse_version("2023.10.0")
DASK_SHUFFLE_CAST_DTYPE = _dask_version > parse_version("2023.12.0")

# Query-planning check (and cache)
_DASK_QUERY_PLANNING_ENABLED = None
Expand All @@ -36,7 +49,7 @@ def query_planning_enabled():
global _DASK_QUERY_PLANNING_ENABLED

if _DASK_QUERY_PLANNING_ENABLED is None:
if _dask_version > parseVersion("2024.6.0"):
if _dask_version > parse_version("2024.6.0"):
import dask.dataframe as dd

_DASK_QUERY_PLANNING_ENABLED = dd.DASK_EXPR_ENABLED
Expand Down
96 changes: 67 additions & 29 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Union
import os
from typing import Any, List, Literal, Optional, Union

import dask.dataframe as dd

Expand All @@ -29,26 +30,40 @@ class DocumentDataset:
def __init__(self, dataset_df: dd.DataFrame):
self.df = dataset_df

def __len__(self):
def __len__(self) -> int:
return len(self.df)

def persist(self):
# `def persist(self) -> Self` requires Python 3.11 or higher
def persist(self) -> "DocumentDataset":
return DocumentDataset(self.df.persist())

def head(self, n=5):
def head(self, n: int = 5) -> Any:
return self.df.head(n)

@classmethod
def read_json(
cls,
input_files: Union[str, List[str]],
backend: str = "pandas",
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
):
) -> "DocumentDataset":
"""
Read JSONL or JSONL file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
"""
return cls(
_read_json_or_parquet(
input_files=input_files,
Expand All @@ -65,13 +80,25 @@ def read_json(
@classmethod
def read_parquet(
cls,
input_files,
backend="pandas",
files_per_partition=1,
add_filename=False,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
):
) -> "DocumentDataset":
"""
Read Parquet file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.
"""
return cls(
_read_json_or_parquet(
input_files=input_files,
Expand All @@ -87,13 +114,24 @@ def read_parquet(
@classmethod
def read_pickle(
cls,
input_files,
backend="pandas",
files_per_partition=1,
add_filename=False,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: int = 1,
add_filename: bool = False,
columns: Optional[List[str]] = None,
**kwargs,
):
) -> "DocumentDataset":
"""
Read Pickle file(s).
Args:
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "filename" column to the DataFrame.
columns: If not None, only these columns will be read from the file.
"""
return cls(
read_data(
input_files=input_files,
Expand All @@ -108,12 +146,12 @@ def read_pickle(

def to_json(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_file_dir: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
"""
write_to_disk(
Expand All @@ -126,12 +164,12 @@ def to_json(

def to_parquet(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_file_dir: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
"""
write_to_disk(
Expand All @@ -144,8 +182,8 @@ def to_parquet(

def to_pickle(
self,
output_file_dir,
write_to_filename=False,
output_file_dir: str,
write_to_filename: bool = False,
):
raise NotImplementedError("DocumentDataset does not support to_pickle yet")

Expand Down Expand Up @@ -190,7 +228,7 @@ def to_pandas(self):
def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: str,
backend: Literal["cudf", "pandas"],
files_per_partition: int,
add_filename: bool,
input_meta: Union[str, dict] = None,
Expand All @@ -217,8 +255,8 @@ def _read_json_or_parquet(
file_ext = "." + file_type

if isinstance(input_files, list):
# List of jsonl or parquet files
if all(f.endswith(file_ext) for f in input_files):
# List of files
if all(os.path.isfile(f) for f in input_files):
raw_data = read_data(
input_files,
file_type=file_type,
Expand Down
Loading

0 comments on commit c7fd294

Please sign in to comment.