From d860ae5f6eea1fe601689e4b47e80a12fd1b1ed8 Mon Sep 17 00:00:00 2001 From: Arthur Shing Date: Tue, 16 Jul 2024 08:40:45 -0700 Subject: [PATCH] Reverting error_df optimization to fix #101 (#104) Co-authored-by: Arthur Shing --- spark_expectations/sinks/utils/writer.py | 57 +----------------------- 1 file changed, 1 insertion(+), 56 deletions(-) diff --git a/spark_expectations/sinks/utils/writer.py b/spark_expectations/sinks/utils/writer.py index 261ceb2..db12ee3 100644 --- a/spark_expectations/sinks/utils/writer.py +++ b/spark_expectations/sinks/utils/writer.py @@ -15,8 +15,6 @@ col, split, current_date, - monotonically_increasing_id, - coalesce, ) from pyspark.sql.types import StructType from spark_expectations import _log @@ -879,19 +877,6 @@ def write_error_records_final( ) -> Tuple[int, DataFrame]: try: _log.info("_write_error_records_final started") - df = df.withColumn("sequence_number", monotonically_increasing_id()) - - df_seq = df - - df = df.select( - "sequence_number", - *[ - dq_column - for dq_column in df.columns - if dq_column.startswith(f"{rule_type}") - ], - ) - df.cache() failed_records = [ f"size({dq_column}) != 0" @@ -929,26 +914,7 @@ def write_error_records_final( lit(self._context.get_run_date), ) ) - error_df_seq = df.filter(f"size(meta_{rule_type}_results) != 0") - - error_df = df_seq.join( - error_df_seq, - df_seq.sequence_number == error_df_seq.sequence_number, - "inner", - ) - - # sequence number column removing from the data frame - error_df_columns = [ - dq_column - for dq_column in error_df.columns - if ( - dq_column.startswith("sequence_number") - or dq_column.startswith(rule_type) - ) - is False - ] - - error_df = error_df.select(error_df_columns) + error_df = df.filter(f"size(meta_{rule_type}_results) != 0") self._context.print_dataframe_with_debugger(error_df) print( @@ -965,27 +931,6 @@ def write_error_records_final( # if _error_count > 0: self.generate_summarized_row_dq_res(error_df, rule_type) - # sequence number adding to dataframe for passing to action function - df = df_seq.join( - error_df_seq, - df_seq.sequence_number == error_df_seq.sequence_number, - "left", - ).withColumn( - f"meta_{rule_type}_results", - coalesce(col(f"meta_{rule_type}_results"), array()), - ) - - df = ( - df.select(error_df_columns) - .withColumn( - self._context.get_run_id_name, lit(self._context.get_run_id) - ) - .withColumn( - self._context.get_run_date_time_name, - lit(self._context.get_run_date), - ) - ) - _log.info("_write_error_records_final ended") return _error_count, df