From 74d31f1280c4e8ca1c361037fee73f8f1b3f4417 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 29 Sep 2023 10:37:18 -0700 Subject: [PATCH 01/22] WIP: Initial stab at fastparquet tests. Signed-off-by: MithunR --- .../python/fastparquet_compatibility_test.py | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 integration_tests/src/main/python/fastparquet_compatibility_test.py diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py new file mode 100644 index 00000000000..918bfc2b247 --- /dev/null +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -0,0 +1,69 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, assert_gpu_sql_fallback_collect +from data_gen import * +from marks import * +from pyspark.sql.types import * +from pyspark.sql.types import NumericType +from pyspark.sql.window import Window +import pyspark.sql.functions as f +from spark_session import is_before_spark_320, is_databricks113_or_later, spark_version, with_cpu_session +import warnings +import fastparquet as fastpak + + +def read_parquet(data_path): + """ + (Fetches a function that) reads Parquet from the specified `data_path`. + If the plugin is enabled, the read is done via Spark APIs, through the plugin. + If the plugin is disabled, the data is read via `fastparquet`. + :param data_path: Location of the (single) Parquet input file. + :return: A function that reads Parquet, via the plugin or `fastparquet`. + """ + def read_with_fastparquet_or_plugin(spark): + plugin_enabled = spark.conf.get("spark.rapids.sql.enabled", "false") == "true" + if plugin_enabled: + return spark.read.parquet(data_path) + else: + df = fastpak.ParquetFile(data_path).to_pandas() + return spark.createDataFrame(df) + return read_with_fastparquet_or_plugin + + +@pytest.mark.parametrize('data_gen', [ + ByteGen(nullable=False), + ShortGen(nullable=False), + IntegerGen(nullable=False), + # pytest.param(IntegerGen(nullable=True), + # marks=pytest.mark.xfail(reason="Nullable Integers are promoted to bigint by fastparquet")), + LongGen(nullable=False), + # pytest.param(LongGen(nullable=True), + # marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), + FloatGen(nullable=False), + DoubleGen(nullable=False), + StringGen(nullable=False), +], ids=idfn) +def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): + data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" + gen = StructGen([('a', data_gen)], nullable=False) + # Write data with CPU session. + with_cpu_session( + lambda spark: gen_df(spark, gen, 100).repartition(1).write.mode('overwrite').parquet(data_path) + ) + # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) + From d3394dcd2c7f5806d868e6ae23ec630d91516942 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 29 Sep 2023 12:44:30 -0700 Subject: [PATCH 02/22] Date tests. Plus minor refactor. --- .../python/fastparquet_compatibility_test.py | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 918bfc2b247..6147b392e6a 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -14,7 +14,8 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, assert_gpu_sql_fallback_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, \ + assert_gpu_sql_fallback_collect from data_gen import * from marks import * from pyspark.sql.types import * @@ -34,13 +35,16 @@ def read_parquet(data_path): :param data_path: Location of the (single) Parquet input file. :return: A function that reads Parquet, via the plugin or `fastparquet`. """ + def read_with_fastparquet_or_plugin(spark): plugin_enabled = spark.conf.get("spark.rapids.sql.enabled", "false") == "true" if plugin_enabled: return spark.read.parquet(data_path) else: + # return spark.read.parquet(data_path) df = fastpak.ParquetFile(data_path).to_pandas() return spark.createDataFrame(df) + return read_with_fastparquet_or_plugin @@ -48,22 +52,43 @@ def read_with_fastparquet_or_plugin(spark): ByteGen(nullable=False), ShortGen(nullable=False), IntegerGen(nullable=False), - # pytest.param(IntegerGen(nullable=True), - # marks=pytest.mark.xfail(reason="Nullable Integers are promoted to bigint by fastparquet")), + pytest.param(IntegerGen(nullable=True), + marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), LongGen(nullable=False), - # pytest.param(LongGen(nullable=True), - # marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), + pytest.param(LongGen(nullable=True), + marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), FloatGen(nullable=False), DoubleGen(nullable=False), StringGen(nullable=False), + pytest.param(DecimalGen(nullable=False), + marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " + "https://fastparquet.readthedocs.io/en/latest/details.html#data-types")), ], ids=idfn) def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', data_gen)], nullable=False) # Write data with CPU session. with_cpu_session( - lambda spark: gen_df(spark, gen, 100).repartition(1).write.mode('overwrite').parquet(data_path) + lambda spark: gen_df(spark, gen, 4096).repartition(1).write.mode('overwrite').parquet(data_path) ) # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) + +@pytest.mark.parametrize('corrected_conf', [{'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}]) +@pytest.mark.parametrize('data_gen', [ + pytest.param(DateGen(nullable=False), + marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")), + pytest.param(DateGen(nullable=False, start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31)), + marks=pytest.mark.xfail(reason="fastparquet reads dates as datetime.")) +], ids=idfn) +def test_read_fastparquet_single_date_column_tables(data_gen, spark_tmp_path, corrected_conf): + data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" + gen = StructGen([('a', data_gen)], nullable=False) + # Write data with CPU session. + with_cpu_session( + lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), + conf=corrected_conf + ) + # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf) From 27ab6d9657057bc59f7e4d4001db8f999ca03920 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 29 Sep 2023 13:11:13 -0700 Subject: [PATCH 03/22] Date/Time tests. Also, recombined the test functions. --- .../python/fastparquet_compatibility_test.py | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 6147b392e6a..64d51142475 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -16,6 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, \ assert_gpu_sql_fallback_collect +from datetime import date, datetime, timezone from data_gen import * from marks import * from pyspark.sql.types import * @@ -26,6 +27,11 @@ import warnings import fastparquet as fastpak +rebase_write_corrected_conf = { + 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED', + 'spark.sql.parquet.int96RebaseModeInWrite': 'CORRECTED' +} + def read_parquet(data_path): """ @@ -48,6 +54,7 @@ def read_with_fastparquet_or_plugin(spark): return read_with_fastparquet_or_plugin +@pytest.mark.parametrize('corrected_conf', [rebase_write_corrected_conf]) @pytest.mark.parametrize('data_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), @@ -63,30 +70,26 @@ def read_with_fastparquet_or_plugin(spark): pytest.param(DecimalGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " "https://fastparquet.readthedocs.io/en/latest/details.html#data-types")), -], ids=idfn) -def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): - data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" - gen = StructGen([('a', data_gen)], nullable=False) - # Write data with CPU session. - with_cpu_session( - lambda spark: gen_df(spark, gen, 4096).repartition(1).write.mode('overwrite').parquet(data_path) - ) - # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. - assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) - - -@pytest.mark.parametrize('corrected_conf', [{'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}]) -@pytest.mark.parametrize('data_gen', [ + pytest.param(DateGen(nullable=False, + start=date(year=2020, month=1, day=1), + end=date(year=2020, month=12, day=31)), + marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")), pytest.param(DateGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")), - pytest.param(DateGen(nullable=False, start=date(year=2020, month=1, day=1), end=date(year=2020, month=12, day=31)), - marks=pytest.mark.xfail(reason="fastparquet reads dates as datetime.")) + TimestampGen(nullable=False, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case. + pytest.param(TimestampGen(nullable=False, + start=datetime(1, 1, 1, tzinfo=timezone.utc), + end=datetime(1899, 12, 31, tzinfo=timezone.utc)), + marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), ], ids=idfn) -def test_read_fastparquet_single_date_column_tables(data_gen, spark_tmp_path, corrected_conf): +def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, corrected_conf): data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', data_gen)], nullable=False) # Write data with CPU session. with_cpu_session( + # Single output file, to avoid differences in order of file reads. lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), conf=corrected_conf ) From b3099185e9650dac53d8a2a736a4fd31dd8b8329 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 2 Oct 2023 11:37:02 -0700 Subject: [PATCH 04/22] Added tests for reading data written with fastparquet. --- .../python/fastparquet_compatibility_test.py | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 64d51142475..efffe4c97a5 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -25,7 +25,6 @@ import pyspark.sql.functions as f from spark_session import is_before_spark_320, is_databricks113_or_later, spark_version, with_cpu_session import warnings -import fastparquet as fastpak rebase_write_corrected_conf = { 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED', @@ -47,6 +46,7 @@ def read_with_fastparquet_or_plugin(spark): if plugin_enabled: return spark.read.parquet(data_path) else: + import fastparquet as fastpak # return spark.read.parquet(data_path) df = fastpak.ParquetFile(data_path).to_pandas() return spark.createDataFrame(df) @@ -95,3 +95,65 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, correct ) # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf) + + +@pytest.mark.parametrize('column_gen', [ + ByteGen(nullable=False), + ByteGen(nullable=True), + ShortGen(nullable=False), + ShortGen(nullable=True), + IntegerGen(nullable=False), + IntegerGen(nullable=True), + LongGen(nullable=False), + LongGen(nullable=True), + FloatGen(nullable=False), + FloatGen(nullable=True), + DoubleGen(nullable=False), + DoubleGen(nullable=True), + DecimalGen(nullable=False), + DecimalGen(nullable=True), + pytest.param( + StringGen(nullable=False), + marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " + "Apache Spark, and the Spark RAPIDS plugin.")), + pytest.param( + StringGen(nullable=True), + marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " + "Apache Spark, and the Spark RAPIDS plugin.")), + pytest.param( + DateGen(nullable=False, + start=date(year=2000, month=1, day=1), + end=date(year=2020, month=12, day=31)), + marks=pytest.mark.xfail(reason="TODO: Test problem: Dates generated by data_gen can't be written " + "with fastparquet, because the dtype/encoding cannot be deduced.")), + pytest.param( + TimestampGen(nullable=False), + marks=pytest.mark.xfail(reason="Timestamps exceeding year=2300 are out of bounds for Pandas.")), + pytest.param( + TimestampGen(nullable=False, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), + marks=pytest.mark.xfail(reason="TODO: Test problem: Timestamps generated by data_gen can't be converted " + "to pandas, because of type errors. ")), +], ids=idfn) +def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): + data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" + # data_path = "/tmp/FASTPARQUET_WRITE_PATH" + + def write_with_fastparquet(spark, data_gen): + # TODO: Provide compression options. + import fastparquet + dataframe = gen_df(spark, data_gen, 2048) + fastparquet.write(data_path, dataframe.toPandas()) + + gen = StructGen([('a', column_gen), + # ('part', IntegerGen(nullable=False)) + ], nullable=False) + # Write data with CPU session. + with_cpu_session( + lambda spark: write_with_fastparquet(spark, gen) + ) + # Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records. + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path), + rebase_write_corrected_conf) From 45000f92c242132bddfe157995d44c243330f1f3 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 2 Oct 2023 14:24:20 -0700 Subject: [PATCH 05/22] Tests for reading GPU-written files. --- .../python/fastparquet_compatibility_test.py | 63 +++++++++++++++---- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index efffe4c97a5..1fce712c7d3 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -14,17 +14,11 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_fallback_collect, \ - assert_gpu_sql_fallback_collect +from asserts import assert_gpu_and_cpu_are_equal_collect from datetime import date, datetime, timezone from data_gen import * -from marks import * from pyspark.sql.types import * -from pyspark.sql.types import NumericType -from pyspark.sql.window import Window -import pyspark.sql.functions as f -from spark_session import is_before_spark_320, is_databricks113_or_later, spark_version, with_cpu_session -import warnings +from spark_session import with_cpu_session, with_gpu_session rebase_write_corrected_conf = { 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED', @@ -34,7 +28,7 @@ def read_parquet(data_path): """ - (Fetches a function that) reads Parquet from the specified `data_path`. + (Fetches a function that) Reads Parquet from the specified `data_path`. If the plugin is enabled, the read is done via Spark APIs, through the plugin. If the plugin is disabled, the data is read via `fastparquet`. :param data_path: Location of the (single) Parquet input file. @@ -97,6 +91,52 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, correct assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf) +@pytest.mark.parametrize('column_gen', [ + ByteGen(nullable=False), + ShortGen(nullable=False), + IntegerGen(nullable=False), + pytest.param(IntegerGen(nullable=True), + marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), + LongGen(nullable=False), + pytest.param(LongGen(nullable=True), + marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), + FloatGen(nullable=False), + DoubleGen(nullable=False), + StringGen(nullable=False), + pytest.param(DecimalGen(nullable=False), + marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " + "https://fastparquet.readthedocs.io/en/latest/details.html#data-types")), + pytest.param(DateGen(nullable=False, + start=date(year=2020, month=1, day=1), + end=date(year=2020, month=12, day=31)), + marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")), + pytest.param(DateGen(nullable=False), + marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")), + TimestampGen(nullable=False, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case. + pytest.param(TimestampGen(nullable=False, + start=datetime(1, 1, 1, tzinfo=timezone.utc), + end=datetime(1899, 12, 31, tzinfo=timezone.utc)), + marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), +], ids=idfn) +def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): + data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH" + + gen = StructGen([('a', column_gen), + ('part', IntegerGen(nullable=False)) + ], nullable=False) + # Write data out with Spark RAPIDS plugin. + with_gpu_session( + lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), + conf=rebase_write_corrected_conf + ) + + # TODO: Maybe make _assert_equal() available to compare dataframes, regardless of CPU vs GPU? + # For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU. + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=rebase_write_corrected_conf) + + @pytest.mark.parametrize('column_gen', [ ByteGen(nullable=False), ByteGen(nullable=True), @@ -138,16 +178,15 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, correct ], ids=idfn) def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" - # data_path = "/tmp/FASTPARQUET_WRITE_PATH" def write_with_fastparquet(spark, data_gen): - # TODO: Provide compression options. + # TODO: (future) Compression settings? import fastparquet dataframe = gen_df(spark, data_gen, 2048) fastparquet.write(data_path, dataframe.toPandas()) gen = StructGen([('a', column_gen), - # ('part', IntegerGen(nullable=False)) + ('part', IntegerGen(nullable=False)) ], nullable=False) # Write data with CPU session. with_cpu_session( From cad94bb32d20e4a950c0b6387cae752bfbca469b Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 4 Oct 2023 21:31:04 -0700 Subject: [PATCH 06/22] Added failing tests for arrays, struct. Plus, minor refactors. --- .../python/fastparquet_compatibility_test.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 1fce712c7d3..7d2e10580d8 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -40,9 +40,8 @@ def read_with_fastparquet_or_plugin(spark): if plugin_enabled: return spark.read.parquet(data_path) else: - import fastparquet as fastpak - # return spark.read.parquet(data_path) - df = fastpak.ParquetFile(data_path).to_pandas() + import fastparquet + df = fastparquet.ParquetFile(data_path).to_pandas() return spark.createDataFrame(df) return read_with_fastparquet_or_plugin @@ -77,6 +76,11 @@ def read_with_fastparquet_or_plugin(spark): start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)), marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), + # TODO: Array gen type deduction borked when converting from Pandas to Spark dataframe. + # ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False), + # TODO: Struct rows seem to be correct, but are failing comparison because of differences in Row representation. + # StructGen(children=[("first", IntegerGen(nullable=False)), + # ("second", FloatGen(nullable=False))], nullable=False) ], ids=idfn) def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, corrected_conf): data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" @@ -84,7 +88,7 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, correct # Write data with CPU session. with_cpu_session( # Single output file, to avoid differences in order of file reads. - lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), + lambda spark: gen_df(spark, gen, 3).repartition(1).write.mode('overwrite').parquet(data_path), conf=corrected_conf ) # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. @@ -155,11 +159,13 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): pytest.param( StringGen(nullable=False), marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " - "Apache Spark, and the Spark RAPIDS plugin.")), + "Apache Spark, and the Spark RAPIDS plugin. " + "See .")), pytest.param( StringGen(nullable=True), marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " - "Apache Spark, and the Spark RAPIDS plugin.")), + "Apache Spark, and the Spark RAPIDS plugin. " + "See .")), pytest.param( DateGen(nullable=False, start=date(year=2000, month=1, day=1), @@ -175,6 +181,11 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): end=datetime(2200, 12, 31, tzinfo=timezone.utc)), marks=pytest.mark.xfail(reason="TODO: Test problem: Timestamps generated by data_gen can't be converted " "to pandas, because of type errors. ")), + pytest.param( + ArrayGen(IntegerGen(nullable=False), nullable=False), + marks.pytest.mark.xfail(reason="Array columns written with fastparquet are read differently between " + "Apache Spark and the Spark RAPIDS plugin. " + "See .")), ], ids=idfn) def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" From 725c316d099d6d8bacffa10c360ea1ba14ec952b Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 4 Oct 2023 23:07:04 -0700 Subject: [PATCH 07/22] Clarification of failure conditions. --- integration_tests/requirements.txt | 3 ++- .../python/fastparquet_compatibility_test.py | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index 7ffeb1122cc..eabd01ba8b7 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -16,4 +16,5 @@ sre_yield pandas pyarrow pytest-xdist >= 2.0.0 -findspark \ No newline at end of file +findspark +fastparquet \ No newline at end of file diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 7d2e10580d8..6661a043679 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -160,17 +160,17 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): StringGen(nullable=False), marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " "Apache Spark, and the Spark RAPIDS plugin. " - "See .")), + "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), pytest.param( StringGen(nullable=True), marks=pytest.mark.xfail(reason="String columns written with fastparquet are read differently between " "Apache Spark, and the Spark RAPIDS plugin. " - "See .")), + "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), pytest.param( DateGen(nullable=False, start=date(year=2000, month=1, day=1), end=date(year=2020, month=12, day=31)), - marks=pytest.mark.xfail(reason="TODO: Test problem: Dates generated by data_gen can't be written " + marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Dates generated in Spark can't be written " "with fastparquet, because the dtype/encoding cannot be deduced.")), pytest.param( TimestampGen(nullable=False), @@ -179,13 +179,15 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): TimestampGen(nullable=False, start=datetime(2000, 1, 1, tzinfo=timezone.utc), end=datetime(2200, 12, 31, tzinfo=timezone.utc)), - marks=pytest.mark.xfail(reason="TODO: Test problem: Timestamps generated by data_gen can't be converted " - "to pandas, because of type errors. ")), + marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be " + "converted to pandas, because of type errors. The error states: " + "\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. " + "Pass e.g. 'datetime64[ns]' instead.\"")), pytest.param( ArrayGen(IntegerGen(nullable=False), nullable=False), - marks.pytest.mark.xfail(reason="Array columns written with fastparquet are read differently between " - "Apache Spark and the Spark RAPIDS plugin. " - "See .")), + marks.pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. " + "The test then fails with the same problem as with String columns. " + "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), ], ids=idfn) def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" From 1b141c266e1a1f091115c2c5bde9b589962b3985 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 13:57:05 -0700 Subject: [PATCH 08/22] Workaround tests for timestamps. --- .../python/fastparquet_compatibility_test.py | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 6661a043679..d28752a1ec1 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -182,10 +182,11 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be " "converted to pandas, because of type errors. The error states: " "\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. " - "Pass e.g. 'datetime64[ns]' instead.\"")), + "Pass e.g. 'datetime64[ns]' instead.\" This test setup has a workaround in " + "test_reading_file_written_with_workaround_fastparquet")), pytest.param( ArrayGen(IntegerGen(nullable=False), nullable=False), - marks.pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. " + marks=pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. " "The test then fails with the same problem as with String columns. " "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), ], ids=idfn) @@ -209,3 +210,58 @@ def write_with_fastparquet(spark, data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.parquet(data_path), rebase_write_corrected_conf) + + +@pytest.mark.parametrize('column_gen, time_format', [ + pytest.param( + TimestampGen(nullable=False, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int64', + marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading timestamps written via " + "fastparquet, if written in int64: " + "\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")), + (TimestampGen(nullable=False, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'), + (TimestampGen(nullable=True, + start=datetime(2000, 1, 1, tzinfo=timezone.utc), + end=datetime(2200, 12, 31, tzinfo=timezone.utc)), 'int96'), + pytest.param( + TimestampGen(nullable=False), 'int96', + marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before " + "1582-10-15 or timestamps before 1900-01-01T00:00:00Z. " + "This messes up reads from Apache Spark and the plugin.")), +], ids=idfn) +def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_tmp_path): + """ + This test is a workaround to test data-types that have problems being converted + from Spark dataframes to Pandas dataframes. + For instance, sparkDF.toPandas() incorrectly converts ARRAY columns into + STRING columns. + This test writes the Spark dataframe into a temporary file, and then uses + `fastparquet` to read and write the file again, to the final destination. + The final file should be in the correct format, with the right datatypes. + This is then checked for read-accuracy, via CPU and GPU. + """ + data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" + data_path = "/tmp/FASTPARQUET_WRITE_PATH" + + def rewrite_with_fastparquet(spark, data_gen): + import fastparquet + tmp_data_path = data_path + "_tmp" + spark_df = gen_df(spark, data_gen, 2048) + spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path) + pandas_df = fastparquet.ParquetFile(tmp_data_path).to_pandas() + fastparquet.write(data_path, pandas_df, times=time_format) + + gen = StructGen([('a', column_gen), + ('part', IntegerGen(nullable=False))], nullable=False) + # Write data with CPU session. + with_cpu_session( + lambda spark: rewrite_with_fastparquet(spark, gen) + ) + # Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records. + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path), + rebase_write_corrected_conf) + From 641fa140a19f0d31dc3d76880cc8156bee3a218b Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 14:09:53 -0700 Subject: [PATCH 09/22] Workaround tests for dates. --- .../python/fastparquet_compatibility_test.py | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index d28752a1ec1..6f8db769b42 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -171,7 +171,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): start=date(year=2000, month=1, day=1), end=date(year=2020, month=12, day=31)), marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Dates generated in Spark can't be written " - "with fastparquet, because the dtype/encoding cannot be deduced.")), + "with fastparquet, because the dtype/encoding cannot be deduced. " + "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), pytest.param( TimestampGen(nullable=False), marks=pytest.mark.xfail(reason="Timestamps exceeding year=2300 are out of bounds for Pandas.")), @@ -183,7 +184,7 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): "converted to pandas, because of type errors. The error states: " "\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. " "Pass e.g. 'datetime64[ns]' instead.\" This test setup has a workaround in " - "test_reading_file_written_with_workaround_fastparquet")), + "test_reading_file_rewritten_with_fastparquet.")), pytest.param( ArrayGen(IntegerGen(nullable=False), nullable=False), marks=pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. " @@ -213,6 +214,24 @@ def write_with_fastparquet(spark, data_gen): @pytest.mark.parametrize('column_gen, time_format', [ + pytest.param( + DateGen(nullable=False, + start=date(year=2000, month=1, day=1), + end=date(year=2020, month=12, day=31)), 'int64', + marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading dates written via " + "fastparquet, if written in int64: " + "\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")), + pytest.param( + DateGen(nullable=False), 'int96', + marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before " + "1582-10-15 or timestamps before 1900-01-01T00:00:00Z. " + "This messes up reads from Apache Spark and the plugin.")), + (DateGen(nullable=False, + start=date(year=2000, month=1, day=1), + end=date(year=2020, month=12, day=31)), 'int96'), + (DateGen(nullable=True, + start=date(year=2000, month=1, day=1), + end=date(year=2020, month=12, day=31)), 'int96'), pytest.param( TimestampGen(nullable=False, start=datetime(2000, 1, 1, tzinfo=timezone.utc), @@ -264,4 +283,3 @@ def rewrite_with_fastparquet(spark, data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.parquet(data_path), rebase_write_corrected_conf) - From ef9c5f1a986f7b412106a300739acb2f2dc0efef Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 14:28:28 -0700 Subject: [PATCH 10/22] Miscellaneous fixes: 1. Fixed dataframe size in test_read_fastparquet_single_column_tables. 2. Removed unnecessary test params for write confs. 3. Moved fastparquet import to top of test file. 4. Expanded on test failure description for timestamps. --- .../python/fastparquet_compatibility_test.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 6f8db769b42..12464d8dbee 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -15,9 +15,8 @@ import pytest from asserts import assert_gpu_and_cpu_are_equal_collect -from datetime import date, datetime, timezone from data_gen import * -from pyspark.sql.types import * +import fastparquet from spark_session import with_cpu_session, with_gpu_session rebase_write_corrected_conf = { @@ -40,14 +39,12 @@ def read_with_fastparquet_or_plugin(spark): if plugin_enabled: return spark.read.parquet(data_path) else: - import fastparquet df = fastparquet.ParquetFile(data_path).to_pandas() return spark.createDataFrame(df) return read_with_fastparquet_or_plugin -@pytest.mark.parametrize('corrected_conf', [rebase_write_corrected_conf]) @pytest.mark.parametrize('data_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), @@ -82,17 +79,17 @@ def read_with_fastparquet_or_plugin(spark): # StructGen(children=[("first", IntegerGen(nullable=False)), # ("second", FloatGen(nullable=False))], nullable=False) ], ids=idfn) -def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path, corrected_conf): +def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', data_gen)], nullable=False) # Write data with CPU session. with_cpu_session( # Single output file, to avoid differences in order of file reads. - lambda spark: gen_df(spark, gen, 3).repartition(1).write.mode('overwrite').parquet(data_path), - conf=corrected_conf + lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), + conf=rebase_write_corrected_conf ) # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. - assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=corrected_conf) + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) @pytest.mark.parametrize('column_gen', [ @@ -175,7 +172,10 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), pytest.param( TimestampGen(nullable=False), - marks=pytest.mark.xfail(reason="Timestamps exceeding year=2300 are out of bounds for Pandas.")), + marks=pytest.mark.xfail(reason="Old timestamps are out of bounds for Pandas. E.g.: " + "\"pandas._libs.tslibs.np_datetime.OutOfBoundsDatetime: Out of bounds " + "nanosecond timestamp: 740-07-19 18:09:56\"." + "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), pytest.param( TimestampGen(nullable=False, start=datetime(2000, 1, 1, tzinfo=timezone.utc), @@ -183,7 +183,7 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be " "converted to pandas, because of type errors. The error states: " "\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. " - "Pass e.g. 'datetime64[ns]' instead.\" This test setup has a workaround in " + "Pass e.g. 'datetime64[ns]' instead.\" This test has a workaround in " "test_reading_file_rewritten_with_fastparquet.")), pytest.param( ArrayGen(IntegerGen(nullable=False), nullable=False), @@ -196,7 +196,6 @@ def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): def write_with_fastparquet(spark, data_gen): # TODO: (future) Compression settings? - import fastparquet dataframe = gen_df(spark, data_gen, 2048) fastparquet.write(data_path, dataframe.toPandas()) @@ -266,7 +265,6 @@ def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_ data_path = "/tmp/FASTPARQUET_WRITE_PATH" def rewrite_with_fastparquet(spark, data_gen): - import fastparquet tmp_data_path = data_path + "_tmp" spark_df = gen_df(spark, data_gen, 2048) spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path) From 782e07640aad11bc74ba4c982f5d348efe9ed524 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 14:41:14 -0700 Subject: [PATCH 11/22] Test descriptions. --- .../python/fastparquet_compatibility_test.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 12464d8dbee..b4357ca2de2 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -80,6 +80,12 @@ def read_with_fastparquet_or_plugin(spark): # ("second", FloatGen(nullable=False))], nullable=False) ], ids=idfn) def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): + """ + This test writes data_gen output to Parquet via Apache Spark, then verifies that fastparquet and the RAPIDS + plugin read the data identically. + There are xfails here because of limitations in converting Spark dataframes to Pandas, if they contain nulls, + as well as limitations in fastparquet's handling of Dates, Timestamps, Decimals, etc. + """ data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', data_gen)], nullable=False) # Write data with CPU session. @@ -122,6 +128,11 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), ], ids=idfn) def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): + """ + This test writes the data-gen output to file via the RAPIDS plugin, then checks that the data is read identically + via fastparquet and Spark. + There are xfails here because of fastparquet limitations in handling Decimal, Timestamps, Dates, etc. + """ data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH" gen = StructGen([('a', column_gen), @@ -189,9 +200,14 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): ArrayGen(IntegerGen(nullable=False), nullable=False), marks=pytest.mark.xfail(reason="spark.toPandas() problem: toPandas() converts Array columns into String. " "The test then fails with the same problem as with String columns. " - "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), + "See https://github.com/NVIDIA/spark-rapids/issues/9387." + "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), ], ids=idfn) def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): + """ + This test writes data-gen output with fastparquet, and checks that both Apache Spark and the RAPIDS plugin + read the written data correctly. + """ data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" def write_with_fastparquet(spark, data_gen): From 411612ead2f0cae1d1d00d0c0a98a531f606e43b Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 15:27:37 -0700 Subject: [PATCH 12/22] Workaround tests for STRUCT, ARRAY, etc. --- .../src/main/python/fastparquet_compatibility_test.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index b4357ca2de2..a0484500f40 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -265,6 +265,16 @@ def write_with_fastparquet(spark, data_gen): marks=pytest.mark.xfail(reason="fastparquet does not support int96RebaseModeInWrite, for dates before " "1582-10-15 or timestamps before 1900-01-01T00:00:00Z. " "This messes up reads from Apache Spark and the plugin.")), + pytest.param( + ArrayGen(nullable=False, child_gen=IntegerGen(nullable=False)), 'int96', + marks=pytest.mark.xfail(reason="fastparquet fails to serialize array elements with any available encoding. " + "E.g. \"Error converting column 'a' to bytes using encoding JSON. " + "Original error: Object of type int32 is not JSON serializable\".")), + (StructGen(nullable=False, children=[('first', IntegerGen(nullable=False))]), 'int96'), + pytest.param( + StructGen(nullable=True, children=[('first', IntegerGen(nullable=False))]), 'int96', + marks=pytest.mark.xfail(reason="fastparquet fails to read nullable Struct columns written from Apache Spark. " + "It fails the rewrite to parquet, thereby failing the test.")), ], ids=idfn) def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_tmp_path): """ @@ -278,7 +288,6 @@ def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_ This is then checked for read-accuracy, via CPU and GPU. """ data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" - data_path = "/tmp/FASTPARQUET_WRITE_PATH" def rewrite_with_fastparquet(spark, data_gen): tmp_data_path = data_path + "_tmp" From 3624fac74054847c0d791c2060d0a7b9a13f00a6 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 16:05:12 -0700 Subject: [PATCH 13/22] Added xfails for struct/array. --- .../python/fastparquet_compatibility_test.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index a0484500f40..9df05d45afb 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -17,7 +17,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * import fastparquet -from spark_session import with_cpu_session, with_gpu_session +from spark_session import spark_version, with_cpu_session, with_gpu_session rebase_write_corrected_conf = { 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED', @@ -45,6 +45,9 @@ def read_with_fastparquet_or_plugin(spark): return read_with_fastparquet_or_plugin +@pytest.mark.skipif(condition=spark_version() < "3.4.1", + reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to " + "work more stably.") @pytest.mark.parametrize('data_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), @@ -73,13 +76,17 @@ def read_with_fastparquet_or_plugin(spark): start=datetime(1, 1, 1, tzinfo=timezone.utc), end=datetime(1899, 12, 31, tzinfo=timezone.utc)), marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), - # TODO: Array gen type deduction borked when converting from Pandas to Spark dataframe. - # ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False), - # TODO: Struct rows seem to be correct, but are failing comparison because of differences in Row representation. - # StructGen(children=[("first", IntegerGen(nullable=False)), - # ("second", FloatGen(nullable=False))], nullable=False) + pytest.param( + ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False), + marks=pytest.mark.xfail(reason="Conversion from Pandas dataframe to Spark dataframe fails: " + "\"Unable to infer the type of the field a\".")), + pytest.param( + StructGen(children=[("first", IntegerGen(nullable=False)), + ("second", FloatGen(nullable=False))], nullable=False), + marks=pytest.mark.xfail(reason="Values are correct, but struct row representations differ between " + "fastparquet and Spark.")) ], ids=idfn) -def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): +def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): """ This test writes data_gen output to Parquet via Apache Spark, then verifies that fastparquet and the RAPIDS plugin read the data identically. @@ -98,6 +105,9 @@ def test_read_fastparquet_single_column_tables(data_gen, spark_tmp_path): assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) +@pytest.mark.skipif(condition=spark_version() < "3.4.1", + reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to " + "work more stably.") @pytest.mark.parametrize('column_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), From 6bcff7aa0c504abfe5aad8fba671f9b9fe4567e3 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 5 Oct 2023 16:07:08 -0700 Subject: [PATCH 14/22] Updated with concrete fastparquet version. --- integration_tests/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index eabd01ba8b7..f70a9f1fd39 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -17,4 +17,4 @@ pandas pyarrow pytest-xdist >= 2.0.0 findspark -fastparquet \ No newline at end of file +fastparquet == 2023.8.0 \ No newline at end of file From 4a67ef0432fe7969c3b66dcd6eb552b91639e6e6 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 6 Oct 2023 16:09:27 -0700 Subject: [PATCH 15/22] Fixed up some xfail messages. --- .../python/fastparquet_compatibility_test.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 9df05d45afb..ab6dd95f3e3 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -45,9 +45,10 @@ def read_with_fastparquet_or_plugin(spark): return read_with_fastparquet_or_plugin -@pytest.mark.skipif(condition=spark_version() < "3.4.1", - reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to " - "work more stably.") +@pytest.mark.skipif(condition=spark_version() < "3.4.0", + reason="spark.createDataFrame(pandasDF) fails with prior versions of Spark: " + "\"AttributeError: 'DataFrame' object has no attribute 'iteritems'. " + "Did you mean: 'isetitem'?\"") @pytest.mark.parametrize('data_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), @@ -55,8 +56,6 @@ def read_with_fastparquet_or_plugin(spark): pytest.param(IntegerGen(nullable=True), marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), LongGen(nullable=False), - pytest.param(LongGen(nullable=True), - marks=pytest.mark.xfail(reason="Nullables cause merge errors, when converting to Spark dataframe")), FloatGen(nullable=False), DoubleGen(nullable=False), StringGen(nullable=False), @@ -84,7 +83,13 @@ def read_with_fastparquet_or_plugin(spark): StructGen(children=[("first", IntegerGen(nullable=False)), ("second", FloatGen(nullable=False))], nullable=False), marks=pytest.mark.xfail(reason="Values are correct, but struct row representations differ between " - "fastparquet and Spark.")) + "fastparquet and Spark. E.g.:\n" + "--- CPU OUTPUT\n" + "+++ GPU OUTPUT\n" + "@@ -1 +1 @@\n" + "-Row(a.first=123, a.second=456)\n" + "+Row(a=Row(first=123, second=456))\n" + "See https://github.com/NVIDIA/spark-rapids/issues/9399.")) ], ids=idfn) def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): """ @@ -105,9 +110,10 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) -@pytest.mark.skipif(condition=spark_version() < "3.4.1", - reason="spark_df.to_pandas() is not reliable on prior versions of Spark. 3.4.1 and above seem to " - "work more stably.") +@pytest.mark.skipif(condition=spark_version() < "3.4.0", + reason="spark.createDataFrame(pandasDF) fails with prior versions of Spark: " + "\"AttributeError: 'DataFrame' object has no attribute 'iteritems'. " + "Did you mean: 'isetitem'?\"") @pytest.mark.parametrize('column_gen', [ ByteGen(nullable=False), ShortGen(nullable=False), From b6b7d190d211f37e382e44f872929ee0bafb7dab Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Oct 2023 11:15:45 -0700 Subject: [PATCH 16/22] Fixed another xfail message. --- .../src/main/python/fastparquet_compatibility_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index ab6dd95f3e3..800def592a6 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -77,8 +77,8 @@ def read_with_fastparquet_or_plugin(spark): marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), pytest.param( ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False), - marks=pytest.mark.xfail(reason="Conversion from Pandas dataframe to Spark dataframe fails: " - "\"Unable to infer the type of the field a\".")), + marks=pytest.mark.xfail(reason="Conversion from Pandas dataframe (read with fastparquet) to Spark dataframe " + "fails: \"Unable to infer the type of the field a\".")), pytest.param( StructGen(children=[("first", IntegerGen(nullable=False)), ("second", FloatGen(nullable=False))], nullable=False), From c9f7e2c6da831cbbab58444d4d0f397fede08963 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Oct 2023 12:33:49 -0700 Subject: [PATCH 17/22] Extend date/time margins to Pandas.Timestamp.min and Pandas.Timestamp.max. --- .../python/fastparquet_compatibility_test.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 800def592a6..505d8621f5f 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -24,6 +24,13 @@ 'spark.sql.parquet.int96RebaseModeInWrite': 'CORRECTED' } +pandas_min_date = date(year=1677, month=9, day=22) # Pandas.Timestamp.min, rounded up. +pandas_max_date = date(year=2262, month=4, day=11) # Pandas.Timestamp.max, rounded down. +pandas_min_datetime = datetime(1677, 9, 21, 00, 12, 44, 0, + tzinfo=timezone.utc) # Pandas.Timestamp.min, rounded up. +pandas_max_datetime = datetime(2262, 4, 11, 23, 47, 16, 0, + tzinfo=timezone.utc) # Pandas.Timestamp.max, rounded down. + def read_parquet(data_path): """ @@ -63,17 +70,17 @@ def read_with_fastparquet_or_plugin(spark): marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " "https://fastparquet.readthedocs.io/en/latest/details.html#data-types")), pytest.param(DateGen(nullable=False, - start=date(year=2020, month=1, day=1), - end=date(year=2020, month=12, day=31)), + start=pandas_min_date, + end=pandas_max_date), marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")), pytest.param(DateGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")), TimestampGen(nullable=False, - start=datetime(2000, 1, 1, tzinfo=timezone.utc), - end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case. + start=pandas_min_datetime, + end=pandas_max_datetime), # Vanilla case. pytest.param(TimestampGen(nullable=False, - start=datetime(1, 1, 1, tzinfo=timezone.utc), - end=datetime(1899, 12, 31, tzinfo=timezone.utc)), + start=pandas_min_datetime, + end=pandas_max_datetime), marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), pytest.param( ArrayGen(child_gen=IntegerGen(nullable=False), nullable=False), @@ -130,17 +137,17 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): marks=pytest.mark.xfail(reason="fastparquet reads Decimal columns as Float, as per " "https://fastparquet.readthedocs.io/en/latest/details.html#data-types")), pytest.param(DateGen(nullable=False, - start=date(year=2020, month=1, day=1), - end=date(year=2020, month=12, day=31)), + start=pandas_min_date, + end=pandas_max_date), marks=pytest.mark.xfail(reason="fastparquet reads dates as timestamps.")), pytest.param(DateGen(nullable=False), marks=pytest.mark.xfail(reason="fastparquet reads far future dates (e.g. year=8705) incorrectly.")), TimestampGen(nullable=False, - start=datetime(2000, 1, 1, tzinfo=timezone.utc), - end=datetime(2200, 12, 31, tzinfo=timezone.utc)), # Vanilla case. + start=pandas_min_datetime, + end=pandas_max_datetime), # Vanilla case. pytest.param(TimestampGen(nullable=False, start=datetime(1, 1, 1, tzinfo=timezone.utc), - end=datetime(1899, 12, 31, tzinfo=timezone.utc)), + end=pandas_min_datetime), marks=pytest.mark.xfail(reason="fastparquet reads timestamps preceding 1900 incorrectly.")), ], ids=idfn) def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): @@ -192,8 +199,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): "See https://github.com/NVIDIA/spark-rapids/issues/9387.")), pytest.param( DateGen(nullable=False, - start=date(year=2000, month=1, day=1), - end=date(year=2020, month=12, day=31)), + start=pandas_min_date, + end=pandas_max_date), marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Dates generated in Spark can't be written " "with fastparquet, because the dtype/encoding cannot be deduced. " "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), @@ -205,8 +212,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): "This test has a workaround in test_reading_file_rewritten_with_fastparquet.")), pytest.param( TimestampGen(nullable=False, - start=datetime(2000, 1, 1, tzinfo=timezone.utc), - end=datetime(2200, 12, 31, tzinfo=timezone.utc)), + start=pandas_min_datetime, + end=pandas_max_datetime), marks=pytest.mark.xfail(reason="spark_df.toPandas() problem: Timestamps in Spark can't be " "converted to pandas, because of type errors. The error states: " "\"TypeError: Casting to unit-less dtype 'datetime64' is not supported. " @@ -247,8 +254,8 @@ def write_with_fastparquet(spark, data_gen): @pytest.mark.parametrize('column_gen, time_format', [ pytest.param( DateGen(nullable=False, - start=date(year=2000, month=1, day=1), - end=date(year=2020, month=12, day=31)), 'int64', + start=pandas_min_date, + end=pandas_max_date), 'int64', marks=pytest.mark.xfail(reason="Apache Spark and the plugin both have problems reading dates written via " "fastparquet, if written in int64: " "\"Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)).\"")), From fceafc2442c2209112d0f7e9c579a2a3667cfc61 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Oct 2023 14:23:30 -0700 Subject: [PATCH 18/22] Added dependency to CI scripts, Docker images. --- integration_tests/README.md | 3 +++ integration_tests/requirements.txt | 2 +- jenkins/Dockerfile-blossom.integration.centos | 1 + jenkins/Dockerfile-blossom.integration.rocky | 1 + jenkins/Dockerfile-blossom.integration.ubuntu | 1 + jenkins/Dockerfile-blossom.ubuntu | 2 +- jenkins/databricks/init_cudf_udf.sh | 1 + jenkins/databricks/setup.sh | 2 +- 8 files changed, 10 insertions(+), 3 deletions(-) diff --git a/integration_tests/README.md b/integration_tests/README.md index 41d4c53861d..bbb601daaf1 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -101,6 +101,9 @@ For manual installation, you need to setup your environment: tests across multiple CPUs to speed up test execution - findspark : Adds pyspark to sys.path at runtime +- [fastparquet](https://fastparquet.readthedocs.io) + : A Python library (independent of Apache Spark) for reading/writing Parquet. Used in the + integration tests for checking Parquet read/write compatibility with the RAPIDS plugin. You can install all the dependencies using `pip` by running the following command: diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index f70a9f1fd39..e8ba7a3e2f1 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -17,4 +17,4 @@ pandas pyarrow pytest-xdist >= 2.0.0 findspark -fastparquet == 2023.8.0 \ No newline at end of file +fastparquet >= 2023.8.0 \ No newline at end of file diff --git a/jenkins/Dockerfile-blossom.integration.centos b/jenkins/Dockerfile-blossom.integration.centos index f1bb9dc16da..c2d2d1d8fc8 100644 --- a/jenkins/Dockerfile-blossom.integration.centos +++ b/jenkins/Dockerfile-blossom.integration.centos @@ -59,6 +59,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ mamba install -y spacy && python -m spacy download en_core_web_sm && \ mamba install -y -c anaconda pytest requests && \ mamba install -y -c conda-forge sre_yield && \ + mamba install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run RUN python -m pip install findspark pytest-xdist pytest-order diff --git a/jenkins/Dockerfile-blossom.integration.rocky b/jenkins/Dockerfile-blossom.integration.rocky index a5f90a0271c..b72850f02e8 100644 --- a/jenkins/Dockerfile-blossom.integration.rocky +++ b/jenkins/Dockerfile-blossom.integration.rocky @@ -54,6 +54,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y spacy && python -m spacy download en_core_web_sm && \ conda install -y -c anaconda pytest requests && \ conda install -y -c conda-forge sre_yield && \ + conda install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run RUN python -m pip install findspark pytest-xdist pytest-order diff --git a/jenkins/Dockerfile-blossom.integration.ubuntu b/jenkins/Dockerfile-blossom.integration.ubuntu index 3a635465f64..34be07d1d8d 100644 --- a/jenkins/Dockerfile-blossom.integration.ubuntu +++ b/jenkins/Dockerfile-blossom.integration.ubuntu @@ -66,6 +66,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y spacy && python -m spacy download en_core_web_sm && \ conda install -y -c anaconda pytest requests && \ conda install -y -c conda-forge sre_yield && \ + conda install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run RUN python -m pip install findspark pytest-xdist pytest-order diff --git a/jenkins/Dockerfile-blossom.ubuntu b/jenkins/Dockerfile-blossom.ubuntu index 0f7236272cb..d5dc731f254 100644 --- a/jenkins/Dockerfile-blossom.ubuntu +++ b/jenkins/Dockerfile-blossom.ubuntu @@ -58,7 +58,7 @@ RUN update-java-alternatives --set /usr/lib/jvm/java-1.8.0-openjdk-amd64 RUN ln -sfn /usr/bin/python3.8 /usr/bin/python RUN ln -sfn /usr/bin/python3.8 /usr/bin/python3 -RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit pytest-order +RUN python -m pip install pytest sre_yield requests pandas pyarrow findspark pytest-xdist pre-commit pytest-order fastparquet # libnuma1 and libgomp1 are required by ucx packaging RUN apt install -y inetutils-ping expect wget libnuma1 libgomp1 diff --git a/jenkins/databricks/init_cudf_udf.sh b/jenkins/databricks/init_cudf_udf.sh index 298f82a6329..c516eed1ede 100755 --- a/jenkins/databricks/init_cudf_udf.sh +++ b/jenkins/databricks/init_cudf_udf.sh @@ -60,6 +60,7 @@ REQUIRED_PACKAGES=( pytest-xdist requests sre_yield + fastparquet ) ${base}/envs/cudf-udf/bin/mamba install -y \ diff --git a/jenkins/databricks/setup.sh b/jenkins/databricks/setup.sh index a1a9d03c900..3658e65d042 100755 --- a/jenkins/databricks/setup.sh +++ b/jenkins/databricks/setup.sh @@ -49,4 +49,4 @@ PYTHON_VERSION=$(${PYSPARK_PYTHON} -c 'import sys; print("python{}.{}".format(sy # Set the path of python site-packages, and install packages here. PYTHON_SITE_PACKAGES="$HOME/.local/lib/${PYTHON_VERSION}/site-packages" # Use "python -m pip install" to make sure pip matches with python. -$PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES pytest sre_yield requests pandas pyarrow findspark pytest-xdist pytest-order +$PYSPARK_PYTHON -m pip install --target $PYTHON_SITE_PACKAGES pytest sre_yield requests pandas pyarrow findspark pytest-xdist pytest-order fastparquet From b10a3218c142ab880f1ad4a57c089a7483392d71 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Oct 2023 16:59:33 -0700 Subject: [PATCH 19/22] Change in tack: Install fastparquet explicitly. --- jenkins/Dockerfile-blossom.integration.centos | 3 +-- jenkins/Dockerfile-blossom.integration.rocky | 3 +-- jenkins/Dockerfile-blossom.integration.ubuntu | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/jenkins/Dockerfile-blossom.integration.centos b/jenkins/Dockerfile-blossom.integration.centos index c2d2d1d8fc8..d2092bdda66 100644 --- a/jenkins/Dockerfile-blossom.integration.centos +++ b/jenkins/Dockerfile-blossom.integration.centos @@ -59,10 +59,9 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ mamba install -y spacy && python -m spacy download en_core_web_sm && \ mamba install -y -c anaconda pytest requests && \ mamba install -y -c conda-forge sre_yield && \ - mamba install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order +RUN python -m pip install findspark pytest-xdist pytest-order fastparquet # Set default java as 1.8.0 ENV JAVA_HOME "/usr/lib/jvm/java-1.8.0-openjdk" diff --git a/jenkins/Dockerfile-blossom.integration.rocky b/jenkins/Dockerfile-blossom.integration.rocky index b72850f02e8..6f8334a445c 100644 --- a/jenkins/Dockerfile-blossom.integration.rocky +++ b/jenkins/Dockerfile-blossom.integration.rocky @@ -54,10 +54,9 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y spacy && python -m spacy download en_core_web_sm && \ conda install -y -c anaconda pytest requests && \ conda install -y -c conda-forge sre_yield && \ - conda install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order +RUN python -m pip install findspark pytest-xdist pytest-order fastparquet # Set default java as 1.8.0 ENV JAVA_HOME "/usr/lib/jvm/java-1.8.0-openjdk" diff --git a/jenkins/Dockerfile-blossom.integration.ubuntu b/jenkins/Dockerfile-blossom.integration.ubuntu index 34be07d1d8d..2850fc5202d 100644 --- a/jenkins/Dockerfile-blossom.integration.ubuntu +++ b/jenkins/Dockerfile-blossom.integration.ubuntu @@ -66,10 +66,9 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ conda install -y spacy && python -m spacy download en_core_web_sm && \ conda install -y -c anaconda pytest requests && \ conda install -y -c conda-forge sre_yield && \ - conda install -y -c conda-forge fastparquet && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order +RUN python -m pip install findspark pytest-xdist pytest-order fastparquet RUN apt install -y inetutils-ping expect From f34eec711972f734f6a4fcd258def5be03e81760 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 10 Oct 2023 11:15:59 -0700 Subject: [PATCH 20/22] Per #8789, reverted change for Centos Dockerfile. Dockerfile-blossom.integration.centos is deprecated. --- jenkins/Dockerfile-blossom.integration.centos | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/Dockerfile-blossom.integration.centos b/jenkins/Dockerfile-blossom.integration.centos index d2092bdda66..f1bb9dc16da 100644 --- a/jenkins/Dockerfile-blossom.integration.centos +++ b/jenkins/Dockerfile-blossom.integration.centos @@ -61,7 +61,7 @@ RUN export CUDA_VER=`echo ${CUDA_VER} | cut -d '.' -f 1,2` && \ mamba install -y -c conda-forge sre_yield && \ conda clean -ay # install pytest plugins for xdist parallel run -RUN python -m pip install findspark pytest-xdist pytest-order fastparquet +RUN python -m pip install findspark pytest-xdist pytest-order # Set default java as 1.8.0 ENV JAVA_HOME "/usr/lib/jvm/java-1.8.0-openjdk" From 126b9f4dedacc1fe53addfe08f24e7150ce5b4e3 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 10 Oct 2023 11:27:41 -0700 Subject: [PATCH 21/22] Removed fastparquet from UDF tests. --- jenkins/databricks/init_cudf_udf.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/jenkins/databricks/init_cudf_udf.sh b/jenkins/databricks/init_cudf_udf.sh index c516eed1ede..298f82a6329 100755 --- a/jenkins/databricks/init_cudf_udf.sh +++ b/jenkins/databricks/init_cudf_udf.sh @@ -60,7 +60,6 @@ REQUIRED_PACKAGES=( pytest-xdist requests sre_yield - fastparquet ) ${base}/envs/cudf-udf/bin/mamba install -y \ From fa356f82a2ef1569f208842da1cf0db1e7803843 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 10 Oct 2023 12:44:43 -0700 Subject: [PATCH 22/22] Optionally skips fastparquet tests. --- .../python/fastparquet_compatibility_test.py | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 505d8621f5f..6edc06c58e9 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -16,9 +16,21 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -import fastparquet from spark_session import spark_version, with_cpu_session, with_gpu_session + +def fastparquet_unavailable(): + """ + Checks whether the `fastparquet` module is unavailable. Helps skip fastparquet tests. + :return: True, if fastparquet is not available. Else, False. + """ + try: + import fastparquet + return False + except ImportError: + return True + + rebase_write_corrected_conf = { 'spark.sql.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.parquet.int96RebaseModeInWrite': 'CORRECTED' @@ -42,6 +54,7 @@ def read_parquet(data_path): """ def read_with_fastparquet_or_plugin(spark): + import fastparquet plugin_enabled = spark.conf.get("spark.rapids.sql.enabled", "false") == "true" if plugin_enabled: return spark.read.parquet(data_path) @@ -52,6 +65,8 @@ def read_with_fastparquet_or_plugin(spark): return read_with_fastparquet_or_plugin +@pytest.mark.skipif(condition=fastparquet_unavailable(), + reason="fastparquet is required for testing fastparquet compatibility") @pytest.mark.skipif(condition=spark_version() < "3.4.0", reason="spark.createDataFrame(pandasDF) fails with prior versions of Spark: " "\"AttributeError: 'DataFrame' object has no attribute 'iteritems'. " @@ -117,6 +132,8 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path)) +@pytest.mark.skipif(condition=fastparquet_unavailable(), + reason="fastparquet is required for testing fastparquet compatibility") @pytest.mark.skipif(condition=spark_version() < "3.4.0", reason="spark.createDataFrame(pandasDF) fails with prior versions of Spark: " "\"AttributeError: 'DataFrame' object has no attribute 'iteritems'. " @@ -172,6 +189,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=rebase_write_corrected_conf) +@pytest.mark.skipif(condition=fastparquet_unavailable(), + reason="fastparquet is required for testing fastparquet compatibility") @pytest.mark.parametrize('column_gen', [ ByteGen(nullable=False), ByteGen(nullable=True), @@ -234,6 +253,7 @@ def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" def write_with_fastparquet(spark, data_gen): + import fastparquet # TODO: (future) Compression settings? dataframe = gen_df(spark, data_gen, 2048) fastparquet.write(data_path, dataframe.toPandas()) @@ -251,6 +271,8 @@ def write_with_fastparquet(spark, data_gen): rebase_write_corrected_conf) +@pytest.mark.skipif(condition=fastparquet_unavailable(), + reason="fastparquet is required for testing fastparquet compatibility") @pytest.mark.parametrize('column_gen, time_format', [ pytest.param( DateGen(nullable=False, @@ -313,6 +335,7 @@ def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_ data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" def rewrite_with_fastparquet(spark, data_gen): + import fastparquet tmp_data_path = data_path + "_tmp" spark_df = gen_df(spark, data_gen, 2048) spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path)