Skip to content

Commit

Permalink
Feature/spark expectations fixes (#117)
Browse files Browse the repository at this point in the history
* Enhancement: Fixed duplicate check issues in row_dq and added a new column to stats_detailed table for improved observability.

Fix: Addressed duplicate check issues in row_dq for improved data quality.
Feature: Added new column to stats_detailed table for enhanced observability.

* Added new column to stats_detailed table for enhanced observability.

* Added new column to stats_detailed table for enhanced observability.

* Feature: Added new column to stats_detailed table for enhanced observability.

* Update CONTRIBUTORS.md

* Add observability enhancement with additional column in stats_detailed table

Improves visibility and aids in quick resolution of data quality issues.

* reverting back additional column name changes

* reverting back additional column name changes

* reverting back additional column name changes

* reverting back additional column name changes

* Update README.md

* Delete README.md

* Adding the Spark Expectation Documentation under Docs Folder

* Delete docs/README.md

keeping the read_me file as it is

* keeping the readme files as it is.
Will update the documentation with the further release.

* updating the list.

---------

Co-authored-by: vtadak <[email protected]>
  • Loading branch information
sudeep7978 and vtadak authored Dec 2, 2024
1 parent f83e469 commit 9500ed9
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Thanks to the contributors who helped on this project apart from the authors
* [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15)
* [Nishant Singh](https://www.linkedin.com/in/singh-nishant/)
* [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680)
* [Sudeepta pal](https://www.linkedin.com/in/sudeepta-pal-98b393217/)
* [Mallikarjunudu Tirumalasetti](https://www.linkedin.com/in/mtirumal/)
* [Tadakala sai vamsi goud](https://www.linkedin.com/in/sai-vamsi-goud-455737169/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ Please find the spark-expectations flow and feature diagrams below
<p align="center">
<img src=https://github.com/Nike-Inc/spark-expectations/blob/main/docs/se_diagrams/features.png?raw=true width=1000></p>


# Spark - Expectations Setup

### Configurations
Expand Down
106 changes: 41 additions & 65 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,68 +174,45 @@ def get_row_dq_detailed_stats(
_dq_res = {d["rule"]: d["failed_row_count"] for d in _row_dq_res}

for _rowdq_rule in _row_dq_expectations:
if _rowdq_rule["rule"] in _dq_res:
failed_row_count = _dq_res[_rowdq_rule["rule"]]
_row_dq_result.append(
# if _rowdq_rule["rule"] in _dq_res:

failed_row_count = _dq_res[_rowdq_rule["rule"]]
_row_dq_result.append(
(
_run_id,
_product_id,
_table_name,
_rowdq_rule["rule_type"],
_rowdq_rule["rule"],
_rowdq_rule["expectation"],
_rowdq_rule["tag"],
_rowdq_rule["description"],
"pass" if int(failed_row_count) == 0 else "fail",
None,
None,
(
(_input_count - int(failed_row_count))
if int(failed_row_count) != 0
else _input_count
),
failed_row_count,
_input_count,
(
_run_id,
_product_id,
_table_name,
_rowdq_rule["rule_type"],
_rowdq_rule["rule"],
_rowdq_rule["expectation"],
_rowdq_rule["tag"],
_rowdq_rule["description"],
"pass" if int(failed_row_count) == 0 else "fail",
None,
None,
(_input_count - int(failed_row_count)),
failed_row_count,
_input_count,
self._context.get_row_dq_start_time.replace(
tzinfo=timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
if self._context.get_row_dq_start_time
else "1900-01-01 00:00:00",
else "1900-01-01 00:00:00"
),
(
self._context.get_row_dq_end_time.replace(
tzinfo=timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
if self._context.get_row_dq_end_time
else "1900-01-01 00:00:00",
)
else "1900-01-01 00:00:00"
),
)
_row_dq_expectations.remove(_rowdq_rule)

for _rowdq_rule in _row_dq_expectations:
_row_dq_result.append(
(
_run_id,
_product_id,
_table_name,
_rowdq_rule["rule_type"],
_rowdq_rule["rule"],
_rowdq_rule["expectation"],
_rowdq_rule["tag"],
_rowdq_rule["description"],
"pass",
None,
None,
_input_count,
"0",
_input_count,
self._context.get_row_dq_start_time.replace(
tzinfo=timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
if self._context.get_row_dq_start_time
else "1900-01-01 00:00:00",
self._context.get_row_dq_end_time.replace(
tzinfo=timezone.utc
).strftime("%Y-%m-%d %H:%M:%S")
if self._context.get_row_dq_end_time
else "1900-01-01 00:00:00",
)
)

return _row_dq_result

except Exception as e:
Expand Down Expand Up @@ -377,8 +354,7 @@ def _prep_secondary_query_output(self) -> DataFrame:
+ "target.source_dq as target_output from _df_custom_detailed_stats_source as source "
+ "left outer join _df_custom_detailed_stats_source as target "
+ "on source.run_id=target.run_id and source.product_id=target.product_id and "
+ "source.table_name=target.table_name and source.rule=target.rule "
+ "and source.dq_type = target.dq_type "
+ "source.table_name=target.table_name and source.rule=target.rule "
+ "and source.alias_comp=target.alias_comp "
+ "and source.compare = 'source' and target.compare = 'target' "
)
Expand Down Expand Up @@ -629,18 +605,18 @@ def write_error_stats(self) -> None:
input_count: int = self._context.get_input_count
error_count: int = self._context.get_error_count
output_count: int = self._context.get_output_count
source_agg_dq_result: Optional[
List[Dict[str, str]]
] = self._context.get_source_agg_dq_result
final_agg_dq_result: Optional[
List[Dict[str, str]]
] = self._context.get_final_agg_dq_result
source_query_dq_result: Optional[
List[Dict[str, str]]
] = self._context.get_source_query_dq_result
final_query_dq_result: Optional[
List[Dict[str, str]]
] = self._context.get_final_query_dq_result
source_agg_dq_result: Optional[List[Dict[str, str]]] = (
self._context.get_source_agg_dq_result
)
final_agg_dq_result: Optional[List[Dict[str, str]]] = (
self._context.get_final_agg_dq_result
)
source_query_dq_result: Optional[List[Dict[str, str]]] = (
self._context.get_source_query_dq_result
)
final_query_dq_result: Optional[List[Dict[str, str]]] = (
self._context.get_final_query_dq_result
)

error_stats_data = [
(
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/utils/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def execute_sql_and_get_result(
f"SELECT ({query}) AS OUTPUT"
).collect()[0][0]
if query
else None
else 0
)

# function to get the query outputs
Expand Down
1 change: 1 addition & 0 deletions tests/sinks/utils/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import os
import unittest.mock
from datetime import datetime
Expand Down
3 changes: 2 additions & 1 deletion tests/utils/test_actions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

from unittest.mock import Mock
from unittest.mock import patch

Expand Down Expand Up @@ -1582,4 +1583,4 @@ def test_run_dq_rules_condition_expression_dynamic_exception(_fixture_df,
match=r"error occurred while running expectations .*"):
_rule_type= _rule_test.get("rule_type")
SparkExpectationsActions.run_dq_rules(_fixture_mock_context, _fixture_df, _expectations,
_rule_type, False, True)
_rule_type, False, True)

0 comments on commit 9500ed9

Please sign in to comment.