diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 3020eecbaeb5..0d9b22ae3f9e 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -116,7 +116,6 @@ def wrapper(self, *args, **kwargs): 'quantile', 'describe', 'sem', - 'mad', 'skew', 'kurt', 'kurtosis', @@ -126,6 +125,10 @@ def wrapper(self, *args, **kwargs): 'cov', 'nunique', ] +# mad was removed in Pandas 2.0. +if PD_VERSION < (2, 0): + UNLIFTABLE_AGGREGATIONS.append('mad') + ALL_AGGREGATIONS = ( LIFTABLE_AGGREGATIONS + LIFTABLE_WITH_SUM_AGGREGATIONS + UNLIFTABLE_AGGREGATIONS) @@ -2092,7 +2095,9 @@ def axes(self): sum = _agg_method(pd.Series, 'sum') median = _agg_method(pd.Series, 'median') sem = _agg_method(pd.Series, 'sem') - mad = _agg_method(pd.Series, 'mad') + # mad was removed in Pandas 2.0. + if PD_VERSION < (2, 0): + mad = _agg_method(pd.Series, 'mad') argmax = frame_base.wont_implement_method( pd.Series, 'argmax', reason='order-sensitive') @@ -3914,10 +3919,12 @@ def pivot_helper(df): std = _agg_method(pd.DataFrame, 'std') var = _agg_method(pd.DataFrame, 'var') sem = _agg_method(pd.DataFrame, 'sem') - mad = _agg_method(pd.DataFrame, 'mad') skew = _agg_method(pd.DataFrame, 'skew') kurt = _agg_method(pd.DataFrame, 'kurt') kurtosis = _agg_method(pd.DataFrame, 'kurtosis') + # mad was removed in Pandas 2.0. + if PD_VERSION < (2, 0): + mad = _agg_method(pd.DataFrame, 'mad') take = frame_base.wont_implement_method(pd.DataFrame, 'take', reason='deprecated') @@ -4670,7 +4677,10 @@ def _is_unliftable(agg_func): return _check_str_or_np_builtin(agg_func, UNLIFTABLE_AGGREGATIONS) NUMERIC_AGGREGATIONS = ['max', 'min', 'prod', 'sum', 'mean', 'median', 'std', - 'var', 'sem', 'mad', 'skew', 'kurt', 'kurtosis'] + 'var', 'sem', 'skew', 'kurt', 'kurtosis'] +# mad was removed in Pandas 2.0. +if PD_VERSION < (2, 0): + NUMERIC_AGGREGATIONS.append('mad') def _is_numeric(agg_func): return _check_str_or_np_builtin(agg_func, NUMERIC_AGGREGATIONS) @@ -4698,7 +4708,6 @@ class _DeferredGroupByCols(frame_base.DeferredFrame): idxmax = frame_base._elementwise_method('idxmax', base=DataFrameGroupBy) idxmin = frame_base._elementwise_method('idxmin', base=DataFrameGroupBy) last = frame_base._elementwise_method('last', base=DataFrameGroupBy) - mad = frame_base._elementwise_method('mad', base=DataFrameGroupBy) max = frame_base._elementwise_method('max', base=DataFrameGroupBy) mean = frame_base._elementwise_method('mean', base=DataFrameGroupBy) median = frame_base._elementwise_method('median', base=DataFrameGroupBy) @@ -4717,8 +4726,11 @@ class _DeferredGroupByCols(frame_base.DeferredFrame): DataFrameGroupBy, 'tail', explanation=_PEEK_METHOD_EXPLANATION) take = frame_base.wont_implement_method( DataFrameGroupBy, 'take', reason='deprecated') - tshift = frame_base._elementwise_method('tshift', base=DataFrameGroupBy) var = frame_base._elementwise_method('var', base=DataFrameGroupBy) + # These already deprecated methods were removed in Pandas 2.0 + if PD_VERSION < (2, 0): + mad = frame_base._elementwise_method('mad', base=DataFrameGroupBy) + tshift = frame_base._elementwise_method('tshift', base=DataFrameGroupBy) @property # type: ignore @frame_base.with_docs_from(DataFrameGroupBy) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index a2a8ef75f885..fa121aa85c30 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1946,6 +1946,12 @@ def test_groupby_multiindex_keep_nans(self): lambda df: df.groupby(['foo', 'bar'], dropna=False).sum(), GROUPBY_DF) +NONPARALLEL_METHODS = ['quantile', 'describe', 'median', 'sem'] +# mad was removed in pandas 2 +if PD_VERSION < (2, 0): + NONPARALLEL_METHODS.append('mad') + + class AggregationTest(_AbstractFrameTest): """Tests for global aggregation methods on DataFrame/Series.""" @@ -1955,7 +1961,7 @@ class AggregationTest(_AbstractFrameTest): def test_series_agg(self, agg_method): s = pd.Series(list(range(16))) - nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad') + nonparallel = agg_method in NONPARALLEL_METHODS # TODO(https://github.com/apache/beam/issues/20926): max and min produce # the wrong proxy @@ -1974,7 +1980,7 @@ def test_series_agg(self, agg_method): def test_series_agg_method(self, agg_method): s = pd.Series(list(range(16))) - nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad') + nonparallel = agg_method in NONPARALLEL_METHODS # TODO(https://github.com/apache/beam/issues/20926): max and min produce # the wrong proxy @@ -1990,7 +1996,7 @@ def test_series_agg_method(self, agg_method): def test_dataframe_agg(self, agg_method): df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) - nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad') + nonparallel = agg_method in NONPARALLEL_METHODS # TODO(https://github.com/apache/beam/issues/20926): max and min produce # the wrong proxy @@ -2007,7 +2013,7 @@ def test_dataframe_agg(self, agg_method): def test_dataframe_agg_method(self, agg_method): df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]}) - nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad') + nonparallel = agg_method in NONPARALLEL_METHODS # TODO(https://github.com/apache/beam/issues/20926): max and min produce # the wrong proxy