Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to check compatibility with fastparquet #9366

Merged
merged 23 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
74d31f1
WIP: Initial stab at fastparquet tests.
mythrocks Sep 29, 2023
d3394dc
Date tests. Plus minor refactor.
mythrocks Sep 29, 2023
27ab6d9
Date/Time tests.
mythrocks Sep 29, 2023
b309918
Added tests for reading data written with fastparquet.
mythrocks Oct 2, 2023
45000f9
Tests for reading GPU-written files.
mythrocks Oct 2, 2023
cad94bb
Added failing tests for arrays, struct.
mythrocks Oct 5, 2023
725c316
Clarification of failure conditions.
mythrocks Oct 5, 2023
1b141c2
Workaround tests for timestamps.
mythrocks Oct 5, 2023
641fa14
Workaround tests for dates.
mythrocks Oct 5, 2023
ef9c5f1
Miscellaneous fixes:
mythrocks Oct 5, 2023
782e076
Test descriptions.
mythrocks Oct 5, 2023
411612e
Workaround tests for STRUCT, ARRAY, etc.
mythrocks Oct 5, 2023
3624fac
Added xfails for struct/array.
mythrocks Oct 5, 2023
6bcff7a
Updated with concrete fastparquet version.
mythrocks Oct 5, 2023
4a67ef0
Fixed up some xfail messages.
mythrocks Oct 6, 2023
b6b7d19
Fixed another xfail message.
mythrocks Oct 9, 2023
c9f7e2c
Extend date/time margins to Pandas.Timestamp.min and Pandas.Timestamp…
mythrocks Oct 9, 2023
fceafc2
Added dependency to CI scripts, Docker images.
mythrocks Oct 9, 2023
b10a321
Change in tack: Install fastparquet explicitly.
mythrocks Oct 9, 2023
f34eec7
Per #8789, reverted change for Centos Dockerfile.
mythrocks Oct 10, 2023
126b9f4
Removed fastparquet from UDF tests.
mythrocks Oct 10, 2023
fa356f8
Optionally skips fastparquet tests.
mythrocks Oct 10, 2023
c072dc5
Merge remote-tracking branch 'origin/branch-23.12' into fastparquet-c…
mythrocks Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion integration_tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ sre_yield
pandas
pyarrow
pytest-xdist >= 2.0.0
findspark
findspark
fastparquet
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
211 changes: 211 additions & 0 deletions integration_tests/src/main/python/fastparquet_compatibility_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from datetime import date, datetime, timezone
from data_gen import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session

rebase_write_corrected_conf = {
'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.int96RebaseModeInWrite': 'CORRECTED'
}


def read_parquet(data_path):
"""
(Fetches a function that) Reads Parquet from the specified `data_path`.
If the plugin is enabled, the read is done via Spark APIs, through the plugin.
If the plugin is disabled, the data is read via `fastparquet`.
:param data_path: Location of the (single) Parquet input file.
:return: A function that reads Parquet, via the plugin or `fastparquet`.
"""

def read_with_fastparquet_or_plugin(spark):
plugin_enabled = spark.conf.get("spark.rapids.sql.enabled", "false") == "true"
if plugin_enabled:
return spark.read.parquet(data_path)
else:
import fastparquet
df = fastparquet.ParquetFile(data_path).to_pandas()
return spark.createDataFrame(df)

return read_with_fastparquet_or_plugin


@pytest.mark.parametrize('corrected_conf', [rebase_write_corrected_conf])
@pytest.mark.parametrize('data_gen', [
ByteGen(nullable=False),
ShortGen(nullable=False),
IntegerGen(nullable=False),
pytest.param(IntegerGen(nullable=True),
marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")),
LongGen(nullable=False),
pytest.param(LongGen(nullable=True),
marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")),
FloatGen(nullable=False),
DoubleGen(nullable=False),
StringGen(nullable=False),
pytest.param(DecimalGen(nullable=False),
marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per "
"https://fastparquet.readthedocs.io/en/latest/details.html#data-types")),
pytest.param(DateGen(nullable=False,
start=date(year=2020, month=1, day=1),
end=date(year=2020, month=12, day=31)),
marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")),
pytest.param(DateGen(nullable=False),
marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")),
TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case.
pytest.param(TimestampGen(nullable=False,
start=datetime(1, 1, 1, tzinfo=timezone.utc),
end=datetime(1899, 12, 31, tzinfo=timezone.utc)),
marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")),
# TODO: Array gen type deduction borked when converting from Pandas to Spark dataframe.
# ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False),
# TODO: Struct rows seem to be correct, but are failing comparison because of differences in Row representation.
# StructGen(children=[("first", IntegerGen(nullable=False)),
# ("second", FloatGen(nullable=False))], nullable=False)
], ids=idfn)
def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, corrected_conf):
data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
gen = StructGen([('a', data_gen)], nullable=False)
# Write data with CPU session.
with_cpu_session(
# Single output file, to avoid differences in order of file reads.
lambda spark: gen_df(spark, gen, 3).repartition(1).write.mode('overwrite').parquet(data_path),
conf=corrected_conf
)
# Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf)


@pytest.mark.parametrize('column_gen', [
ByteGen(nullable=False),
ShortGen(nullable=False),
IntegerGen(nullable=False),
pytest.param(IntegerGen(nullable=True),
marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")),
LongGen(nullable=False),
pytest.param(LongGen(nullable=True),
marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")),
FloatGen(nullable=False),
DoubleGen(nullable=False),
StringGen(nullable=False),
pytest.param(DecimalGen(nullable=False),
marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per "
"https://fastparquet.readthedocs.io/en/latest/details.html#data-types")),
pytest.param(DateGen(nullable=False,
start=date(year=2020, month=1, day=1),
end=date(year=2020, month=12, day=31)),
marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")),
pytest.param(DateGen(nullable=False),
marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")),
TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case.
pytest.param(TimestampGen(nullable=False,
start=datetime(1, 1, 1, tzinfo=timezone.utc),
end=datetime(1899, 12, 31, tzinfo=timezone.utc)),
marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")),
], ids=idfn)
def test_reading_file_written_with_gpu(spark_tmp_path, column_gen):
data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH"

gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))
], nullable=False)
# Write data out with Spark RAPIDS plugin.
with_gpu_session(
lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path),
conf=rebase_write_corrected_conf
)

# TODO: Maybe make _assert_equal() available to compare dataframes, regardless of CPU vs GPU?
# For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=rebase_write_corrected_conf)


@pytest.mark.parametrize('column_gen', [
ByteGen(nullable=False),
ByteGen(nullable=True),
ShortGen(nullable=False),
ShortGen(nullable=True),
IntegerGen(nullable=False),
IntegerGen(nullable=True),
LongGen(nullable=False),
LongGen(nullable=True),
FloatGen(nullable=False),
FloatGen(nullable=True),
DoubleGen(nullable=False),
DoubleGen(nullable=True),
DecimalGen(nullable=False),
DecimalGen(nullable=True),
pytest.param(
StringGen(nullable=False),
marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between "
"Apache Spark, and the Spark RAPIDS plugin. "
"See https://github.com/NVIDIA/spark-rapids/issues/9387.")),
pytest.param(
StringGen(nullable=True),
marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between "
"Apache Spark, and the Spark RAPIDS plugin. "
"See https://github.com/NVIDIA/spark-rapids/issues/9387.")),
pytest.param(
DateGen(nullable=False,
start=date(year=2000, month=1, day=1),
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
end=date(year=2020, month=12, day=31)),
marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Dates generated in Spark can't be written "
"with fastparquet, because the dtype/encoding cannot be deduced.")),
pytest.param(
TimestampGen(nullable=False),
marks=pytest.mark.xfail(reason="Timestamps exceeding year=2300 are out of bounds for Pandas.")),
pytest.param(
TimestampGen(nullable=False,
start=datetime(2000, 1, 1, tzinfo=timezone.utc),
end=datetime(2200, 12, 31, tzinfo=timezone.utc)),
marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be "
"converted to pandas, because of type errors. The error states: "
"\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. "
"Pass e.g. 'datetime64[ns]' instead.\"")),
pytest.param(
ArrayGen(IntegerGen(nullable=False), nullable=False),
marks.pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. "
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"The test then fails with the same problem as with String columns. "
"See https://github.com/NVIDIA/spark-rapids/issues/9387.")),
], ids=idfn)
def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path):
data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH"

def write_with_fastparquet(spark, data_gen):
# TODO: (future) Compression settings?
import fastparquet
dataframe = gen_df(spark, data_gen, 2048)
fastparquet.write(data_path, dataframe.toPandas())

gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))
], nullable=False)
# Write data with CPU session.
with_cpu_session(
lambda spark: write_with_fastparquet(spark, gen)
)
# Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
rebase_write_corrected_conf)