Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fuzzy_dedup OOM issue #471

Open
chenrui17 opened this issue Jan 7, 2025 · 2 comments
Open

fuzzy_dedup OOM issue #471

chenrui17 opened this issue Jan 7, 2025 · 2 comments
Labels
bug Something isn't working

Comments

@chenrui17
Copy link

chenrui17 commented Jan 7, 2025

Describe the bug
Use 5*A100 GPUs to do fuzzey_dedup task and encountered OOM issues. here is error info

2024-12-31 05:02:43,370 - distributed.worker - ERROR - Could not serialize object of type DataFrame
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/serialize.py", line 52, in dask_dumps
    sub_header, frames = dumps(x)
  File "/usr/local/lib/python3.10/dist-packages/cudf/comm/serialize.py", line 19, in dask_serialize_cudf_object
    return x.host_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 150, in host_serialize
    header, frames = self.device_serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/abc.py", line 90, in device_serialize
    header, frames = self.serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/dataframe.py", line 1171, in serialize
    header, frames = super().serialize()
  File "/usr/local/lib/python3.10/dist-packages/cudf/utils/performance_tracking.py", line 51, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/frame.py", line 100, in serialize
    header["columns"], frames = serialize_columns(self._columns)
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in serialize_columns
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 2279, in <listcomp>
    header_columns = [c.serialize() for c in columns]
  File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 1226, in serialize
    if self.children:
  File "column.pyx", line 293, in cudf._lib.column.Column.children.__get__
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /tmp/pip-build-env-fkfud_57/normal/lib/python3.10/site-packages/librmm/include/rmm/mr/device/cuda_memory_resource.hpp:60: cudaErrorMemoryAllocation out of memory

Steps/Code to reproduce bug

import argparse
import time
import os
from datetime import datetime
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.script_utils import ArgumentHelper
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig
INPUT_DIR = os.environ.get("TMP_LLM_STEP_INPUT_DIR")
OUTPUT_DIR = os.environ.get("TMP_LLM_STEP_OUTPUT_DIR")
CACHE_DIR = os.environ.get("TMP_LLM_STEP_CACHE_DIR")
LOG_DIR = os.environ.get("TMP_LLM_NEMO_LOG_ROOT")
PROFILE_DIR = os.environ.get("TMP_LLM_STEP_PROFILE_DIR")
SYS_LOG_FILE = os.environ.get("TMP_LLM_NEMO_PIPELINE_LOG")
ID_FIELD = os.environ.get("TMP_LLM_NEMO_ID_FIELD")
TEXT_FIELD = os.environ.get("TMP_LLM_NEMO_TEXT_FIELD")
INPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_INPUT_DATA_FORMAT")
OUTPUT_DATA_FORMAT = os.environ.get("TMP_LLM_NEMO_OUTPUT_DATA_FORMAT")

def pre_imports():
    import dask, cudf, dask_cudf
    
def attach_args(
        parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    ):
    return ArgumentHelper(parser).add_distributed_args()

def pipeline_log(msg, level = "INFO"):
    log_file_path = SYS_LOG_FILE
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    log_entry = f"[{current_time}]: [{level}]: {msg}\n"
    print(log_entry)
    with open(log_file_path, 'a') as log_file:
        log_file.write(log_entry)

def ensure_directory_exists(filename: str):
    os.makedirs(os.path.dirname(filename), exist_ok=True)

def load_dataset(input_data_dir, file_type = "parquet", backend = "cudf"):
    files = list(get_all_files_paths_under(input_data_dir))
    raw_data = read_data(files, file_type = file_type, backend = backend, add_filename=True)
    dataset = DocumentDataset(raw_data)
    return dataset

def fuzzy_dedup(input_dataset):
    fuzzy_dedup_config = FuzzyDuplicatesConfig(
            cache_dir = CACHE_DIR,
            id_field = ID_FIELD,
            text_field = TEXT_FIELD,
            seed=42,
            char_ngrams=5,
            num_buckets=20,
            hashes_per_bucket=13,
            use_64_bit_hash=False,
            buckets_per_shuffle=5,
            false_positive_check=True,
            num_anchors=2,
            jaccard_threshold=0.8,
        )
    fuzzy_dup = FuzzyDuplicates(logger = LOG_DIR, config = fuzzy_dedup_config)
    duplicates = fuzzy_dup(dataset = input_dataset)
    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x.group.duplicated(keep="first")]
    )
    result = input_dataset.df[
        ~input_dataset.df[ID_FIELD].isin(
            docs_to_remove[ID_FIELD].compute()
        )
    ]
    return DocumentDataset(result)

def main(args):
    client = get_client(**ArgumentHelper.parse_client_args(args))
    backend = "pandas"
    if args.device == "gpu":
        backend = "cudf"
        pre_imports()
    pipeline_log(f"dedup source: {INPUT_DIR}")
    input_dataset = load_dataset(INPUT_DIR)
    doc_total = len(input_dataset)
    pipeline_log(f"{doc_total} lines read")
    dataset_processed = fuzzy_dedup(input_dataset)
    doc_count = len(dataset_processed)
    pipeline_log(f"{doc_count} linese remains after fuzzy dedup")
    pipeline_log(f"output path: {OUTPUT_DIR}")
    write_to_disk(dataset_processed.df, OUTPUT_DIR, output_type = OUTPUT_DATA_FORMAT)
    pipeline_log("write completed")

if __name__ == "__main__":
    main(attach_args().parse_args())

Environment overview (please complete the following information)

with 5 servers.
each has 1*A100(80GB).
each has 70 CPU cores.
each has 320GB CPU memory.

Additional context

use dclm-baseline 1.0 parquet data and totally 8TB parquet data(after add nemo_id and no compression)

@chenrui17 chenrui17 added the bug Something isn't working label Jan 7, 2025
@ayushdg
Copy link
Collaborator

ayushdg commented Jan 9, 2025

Thanks for raising the issue @chenrui17 .
For 8TB of input data on 5 A100 GPUs (~400GB memory) the memory requirements to hold intermediates during stages like LSH might lead to OOM's.

I have a few recommendations to reduce memory and computational requirements at this scale.

char_ngrams=24, # use larger ngram size to reduce false positives.
buckets_per_shuffle=1, # process 1 bucket per iteration of LSH to reduce memory requirements.

# skip the false positive check which is computationally expensive. 
# In practice this is usually 1-2% of documents based on our experiments.
false_positive_check=False,

Some of the changes suggested above are becoming the default in Curator (see #386).

Additionally I would recommend parquet files <= 2GB uncompressed if you have large files. If using small files, you can use the blocksize=1GB arg during read_parquet (here:

def read_parquet(
cls,
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
**kwargs,
or specify files_per_partition to combine multiple parquet files into a single block for processing in prior versions of curator.

@ayushdg
Copy link
Collaborator

ayushdg commented Jan 9, 2025

Internally we've typically used 16-24 GPUs for processing data at this scale so I'm not sure if these suggestions will prevent OOM errors on 5 GPUs, but happy to follow up and see if this improves things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants