Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/spark expectation enhancements#123 #125

1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Thanks to the contributors who helped on this project apart from the authors
* [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15)
* [Nishant Singh](https://www.linkedin.com/in/singh-nishant/)
* [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680)
* [Raghavendra H S](https://www.linkedin.com/in/raghavendra-h-s-01786332/)
* [Sudeepta pal](https://www.linkedin.com/in/sudeepta-pal-98b393217/)
* [Mallikarjunudu Tirumalasetti](https://www.linkedin.com/in/mtirumal/)
* [Tadakala sai vamsi goud](https://www.linkedin.com/in/sai-vamsi-goud-455737169/)
Expand Down
1 change: 1 addition & 0 deletions docs/delta.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ builder = (
.config("spark.sql.warehouse.dir", "/tmp/hive/warehouse")
.config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby")
.config("spark.jars.ivy", "/tmp/ivy2")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
)
spark = builder.getOrCreate()
```
Expand Down
94 changes: 48 additions & 46 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,30 @@ product_id string, -- (2)!
table_name string, -- (3)!
rule_type string, -- (4)!
rule string, -- (5)!
source_expectations string, -- (6)!
tag string, -- (7)!
description string, -- (8)!
source_dq_status string, -- (9)!
source_dq_actual_outcome string, -- (10)!
source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
source_dq_start_time string, -- (15)!
source_dq_end_time string, -- (16)!
target_expectations string, -- (17)!
target_dq_status string, -- (18)!
target_dq_actual_outcome string, -- (19)!
target_dq_expected_outcome string, -- (20)!
target_dq_actual_row_count string, -- (21)!
target_dq_error_row_count string, -- (22)!
target_dq_row_count string, -- (23)!
target_dq_start_time string, -- (24)!
target_dq_end_time string, -- (25)!
dq_date date, -- (26)!
dq_time string, -- (27)!
dq_job_metadata_info string, -- (28)!
column_name, --(6)!
source_expectations string, -- (7)!
tag string, -- (8)!
description string, -- (9)!
source_dq_status string, -- (10)!
source_dq_actual_outcome string, -- (11)!
source_dq_expected_outcome string, -- (12)!
source_dq_actual_row_count string, -- (13)!
source_dq_error_row_count string, -- (14)!
source_dq_row_count string, -- (15)!
source_dq_start_time string, -- (16)!
source_dq_end_time string, -- (17)!
target_expectations string, -- (18)!
target_dq_status string, -- (19)!
target_dq_actual_outcome string, -- (20)!
target_dq_expected_outcome string, -- (21)!
target_dq_actual_row_count string, -- (22)!
target_dq_error_row_count string, -- (23)!
target_dq_row_count string, -- (24)!
target_dq_start_time string, -- (25)!
target_dq_end_time string, -- (26)!
dq_date date, -- (27)!
dq_time string, -- (28)!
dq_job_metadata_info string, -- (29)!
);
```

Expand All @@ -187,26 +188,27 @@ dq_job_metadata_info string, -- (28)!
3. `table_name` The target table where the final data gets inserted
4. `rule_type` Either row/query/agg dq
5. `rule` Rule name
6. `source_expectations` Actual Rule to be executed on the source dq
7. `tag` completeness,uniqueness,validity,accuracy,consistency,
8. `description` Description of the Rule
9. `source_dq_status` Status of the rule execution in the Source dq
10. `source_dq_actual_outcome` Actual outcome of the Source dq check
11. `source_dq_expected_outcome` Expected outcome of the Source dq check
12. `source_dq_actual_row_count` Number of rows of the source dq
13. `source_dq_error_row_count` Number of rows failed in the source dq
14. `source_dq_row_count` Number of rows of the source dq
15. `source_dq_start_time` source dq start timestamp
16. `source_dq_end_time` source dq end timestamp
17. `target_expectations` Actual Rule to be executed on the target dq
18. `target_dq_status` Status of the rule execution in the Target dq
19. `target_dq_actual_outcome` Actual outcome of the Target dq check
20. `target_dq_expected_outcome` Expected outcome of the Target dq check
21. `target_dq_actual_row_count` Number of rows of the target dq
22. `target_dq_error_row_count` Number of rows failed in the target dq
23. `target_dq_row_count` Number of rows of the target dq
24. `target_dq_start_time` target dq start timestamp
25. `target_dq_end_time` target dq end timestamp
26. `dq_date` Dq executed date
27. `dq_time` Dq executed timestamp
28. `dq_job_metadata_info` dq job metadata
6. `column_name` column name where the rule got executed
7. `source_expectations` Actual Rule to be executed on the source dq
8. `tag` completeness,uniqueness,validity,accuracy,consistency,
9. `description` Description of the Rule
10. `source_dq_status` Status of the rule execution in the Source dq
11. `source_dq_actual_outcome` Actual outcome of the Source dq check
12. `source_dq_expected_outcome` Expected outcome of the Source dq check
13. `source_dq_actual_row_count` Number of rows of the source dq
14. `source_dq_error_row_count` Number of rows failed in the source dq
15. `source_dq_row_count` Number of rows of the source dq
16. `source_dq_start_time` source dq start timestamp
17. `source_dq_end_time` source dq end timestamp
18. `target_expectations` Actual Rule to be executed on the target dq
19. `target_dq_status` Status of the rule execution in the Target dq
20. `target_dq_actual_outcome` Actual outcome of the Target dq check
21. `target_dq_expected_outcome` Expected outcome of the Target dq check
22. `target_dq_actual_row_count` Number of rows of the target dq
23. `target_dq_error_row_count` Number of rows failed in the target dq
24. `target_dq_row_count` Number of rows of the target dq
25. `target_dq_start_time` target dq start timestamp
26. `target_dq_end_time` target dq end timestamp
27. `dq_date` Dq executed date
28. `dq_time` Dq executed timestamp
29. `dq_job_metadata_info` dq job metadata
3 changes: 2 additions & 1 deletion spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
class Constants:
# declare const user config variables for email notification
se_notifications_enable_email = "spark.expectations.notifications.email.enabled"
se_enable_observability = "spark.expectations.observability.enabled"
se_notifications_enable_custom_email_body = (
"spark.expectations.notifications.enable.custom.email.body"
)
Expand All @@ -16,7 +17,7 @@ class Constants:
)
se_notifications_email_from = "spark.expectations.notifications.email.from"
se_notifications_email_to_other_mail_id = (
"spark.expectations.notifications.email.to.other.mail.com"
"sudeepta.pal@nike.com"
)
se_notifications_email_subject = "spark.expectations.notifications.email.subject"
se_notifications_email_custom_body = (
Expand Down
44 changes: 43 additions & 1 deletion spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __post_init__(self) -> None:
self._run_id: str = f"{self.product_id}_{uuid1()}"
self._run_date: str = self.set_run_date()
self._dq_stats_table_name: Optional[str] = None
self._dq_stats_report_table_name: Optional[str] = None
self._dq_detailed_stats_table_name: Optional[str] = None
self._final_table_name: Optional[str] = None
self._error_table_name: Optional[str] = None
Expand Down Expand Up @@ -142,6 +143,7 @@ def __post_init__(self) -> None:

self._target_and_error_table_writer_config: dict = {}
self._stats_table_writer_config: dict = {}
self._report_table_config: dict = {}

# The below config is user config and will be enabled if detailed result is required for agg and query dq
self._enable_agg_dq_detailed_result: bool = False
Expand Down Expand Up @@ -196,6 +198,25 @@ def get_dq_stats_table_name(self) -> str:
"""The spark expectations context is not set completely, please assign '_dq_stats_table_name' before
accessing it"""
)
def set_dq_stats_report_table_name(self,dq_stats_report_table_name: str) -> None:
self._dq_stats_report_table_name = dq_stats_report_table_name



@property
def get_dq_stats_report_table_name(self) -> str:
"""
Get dq_stats_table_name to which the final stats of the dq job will be written into

Returns:
str: returns the dq_stats_table_name
"""
if self._dq_stats_report_table_name:
return self._dq_stats_report_table_name
raise SparkExpectationsMiscException(
"""The spark expectations context is not set completely, please assign '_dq_stats_report_table_name' before
accessing it"""
)

def set_dq_expectations(self, dq_expectations: dict) -> None:
self._dq_expectations = dq_expectations
Expand Down Expand Up @@ -1844,7 +1865,7 @@ def set_dq_detailed_stats_table_name(
self._dq_detailed_stats_table_name = dq_detailed_stats_table_name

@property
def get_dq_detailed_stats_table_name(self) -> str:
def get_dq_detailed_stats_table_name(self) -> str:
"""
Get dq_stats_table_name to which the final stats of the dq job will be written into

Expand Down Expand Up @@ -1885,6 +1906,23 @@ def get_query_dq_output_custom_table_name(self) -> str:
'_dq_detailed_stats_table_name,query_dq_detailed_stats_status' before
accessing it"""
)
def set_report_table_config(self, config: dict) -> None:
"""
This function sets report table config
Args:
config: dict
Returns: None
"""
self._report_table_config = config

@property
def get_report_table_config(self) -> dict:
"""
This function returns report table config
Returns:
dict: Returns report_table_config which in dict
"""
return self._report_table_config

def set_detailed_stats_table_writer_config(self, config: dict) -> None:
"""
Expand Down Expand Up @@ -2062,3 +2100,7 @@ def get_stats_dict(self) -> Optional[List[Dict[str, Any]]]:
Optional[List[Dict[str, Any]]]: Returns the stats_dict if it exists, otherwise None
"""
return self._stats_dict if hasattr(self, "_stats_dict") else None

@property
def report_table_config(self):
return self._report_table_config
7 changes: 6 additions & 1 deletion spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,12 @@ def _except(func: Any) -> Any:
else False
)

_job_metadata: str = user_config.se_job_metadata
# _job_metadata: str = user_config.se_job_metadata
_job_metadata: str = (
str(_notification_dict[user_config.se_job_metadata])
if isinstance(_notification_dict[user_config.se_job_metadata], str)
else None
)

notifications_on_error_drop_threshold = _notification_dict.get(
user_config.se_notifications_on_error_drop_threshold, 100
Expand Down
1 change: 1 addition & 0 deletions spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def set_up_delta() -> SparkSession:
.config("spark.sql.warehouse.dir", "/tmp/hive/warehouse")
.config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby")
.config("spark.jars.ivy", "/tmp/ivy2")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
)
spark = builder.getOrCreate()

Expand Down
26 changes: 16 additions & 10 deletions spark_expectations/examples/sample_dq_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

spark = set_up_delta()
dic_job_info = {
"job": "job_name",
"job": "na_CORL_DIGITAL_source_to_o9",
"Region": "NA",
"env": "dev",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)
Expand All @@ -32,12 +33,13 @@
)

user_conf = {
user_config.se_notifications_enable_email: True,
user_config.se_notifications_enable_custom_email_body: True,
user_config.se_notifications_email_smtp_host: "mailhost.com",
user_config.se_notifications_email_smtp_port: 25,
user_config.se_notifications_email_from: "",
user_config.se_notifications_email_to_other_mail_id: "",
user_config.se_enable_observability:"Enable",
user_config.se_notifications_enable_email: False,
user_config.se_notifications_enable_custom_email_body: False,
user_config.se_notifications_email_smtp_host: "smtp.office365.com",
user_config.se_notifications_email_smtp_port: 587,
user_config.se_notifications_email_from: "[email protected]",
user_config.se_notifications_email_to_other_mail_id: "[email protected]",
user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_email_custom_body: """Spark Expectations Statistics for this dq run:
'product_id': {},
Expand Down Expand Up @@ -116,12 +118,16 @@ def build_new() -> DataFrame:
spark.sql("select * from dq_spark_dev.dq_stats").show(truncate=False)
spark.sql("select * from dq_spark_dev.dq_stats_detailed").show(truncate=False)
spark.sql("select * from dq_spark_dev.dq_stats_querydq_output").show(truncate=False)
spark.sql("select * from dq_spark_dev.dq_stats").printSchema()
spark.sql("select * from dq_spark_dev.dq_stats_detailed").printSchema()
spark.sql("select * from dq_spark_dev.customer_order").show(truncate=False)
_log.info("BELOW IS THE REPORT TABLE")

spark.sql("select * from dq_spark_dev.dq_obs_report_data").show(truncate=False)

# spark.sql("select count(*) from dq_spark_local.customer_order_error ").show(
# truncate=False
# )
if user_config.se_enable_observability == "spark.expectations.observability.enabled":
_log.info("alert_send_successfully")


_log.info("stats data in the kafka topic")
# display posted statistics from the kafka topic
Expand Down
Loading
Loading