Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
asingamaneni authored Apr 18, 2024
2 parents d9280b3 + 002b742 commit 3a387e9
Show file tree
Hide file tree
Showing 16 changed files with 527 additions and 158 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Thanks to the contributors who helped on this project apart from the authors
* [Jagadapi Sivanaga Krishnam Raja Reddy](www.linkedin.com/in/jskrajareddy/)
* [Vigneshwarr Venkatesan](https://www.linkedin.com/in/vignesh15)
* [Amaldev Kunnel](https://www.linkedin.com/in/amaldev-k-40222680)
* [Nishant Singh](https://www.linkedin.com/in/singh-nishant/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand Down
9 changes: 9 additions & 0 deletions docs/configurations/configure_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ action_if_failed, tag, description, enable_for_source_dq_validation, enable_fo
--statistics table when the sum of the sales values falls below 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales', 'sales', 'sum(sales)>10000', 'ignore',
'validity', 'sum of sales must be greater than 10000', true, true, true,false, 0,null, null)

--The aggregation rule is established on the 'sales' column and the metadata of the rule will be captured in the
--statistics table when the sum of the sales values falls between 1000 and 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales_range_type1', 'sales', 'sum(sales) between 1000 and 10000', 'ignore',
'validity', 'sum of sales must be between 1000 and 1000', true, true, true)

--The aggregation rule is established on the 'sales' column and the metadata of the rule will be captured in the
--statistics table when the sum of the sales value is greater than 1000 and less than 10000
,('apla_nd', '`catalog`.`schema`.customer_order', 'agg_dq', 'sum_of_sales_range_type2', 'sales', 'sum(sales)>1000 and sum(sales)<10000', 'ignore', 'validity', 'sum of sales must be greater than 1000 and less than 10000', true, true, true)

--The aggregation rule is established on the 'ship_mode' column and the metadata of the rule will be captured in
--the statistics table when distinct ship_mode greater than 3 and enabled for only source data set
Expand Down
1 change: 1 addition & 0 deletions docs/configurations/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Please find the different types of possible expectations
| Expect the maximum value in a column to fall within a specified range| expect_column_max_to_be_between |accuracy |```max([col_name]) between [lower_bound] and [upper_bound]``` |
| Expect the minimum value in a column fall within a specified range | expect_column_sum_to_be_between |accuracy | ```min([col_name]) between [lower_bound] and [upper_bound]``` |
| Expect row count of the dataset fall within certain range | expect_row_count_to_be_between | accuracy | ```count(*) between [lower_bound] and [upper_bound]``` |
| Expect row count of the dataset fall within certain range | expect_row_count_to_be_in_range | accuracy | ```count(*) >[lower_bound] and count(*) < [upper_bound]``` |


#### Possible Query Data Quality Expectations
Expand Down
196 changes: 135 additions & 61 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,52 @@ your rules table for your project.

```sql
create table if not exists `catalog`.`schema`.`{product}_rules` (
product_id STRING,
table_name STRING,
rule_type STRING,
rule STRING,
column_name STRING,
expectation STRING,
action_if_failed STRING,
tag STRING,
description STRING,
enable_for_source_dq_validation BOOLEAN,
enable_for_target_dq_validation BOOLEAN,
is_active BOOLEAN,
enable_error_drop_alert BOOLEAN,
error_drop_threshold INT,
query_dq_delimiter STRING,
enable_querydq_custom_output BOOLEAN
product_id STRING, -- (1)!
table_name STRING, -- (2)!
rule_type STRING, -- (3)!
rule STRING, -- (4)!
column_name STRING, -- (5)!
expectation STRING, -- (6)!
action_if_failed STRING, -- (7)!
tag STRING, -- (8)!
description STRING, -- (9)!
enable_for_source_dq_validation BOOLEAN, -- (10)!
enable_for_target_dq_validation BOOLEAN, -- (11)!
is_active BOOLEAN, -- (12)!
enable_error_drop_alert BOOLEAN, -- (13)!
error_drop_threshold INT, -- (14)!
query_dq_delimiter STRING, -- (15)!
enable_querydq_custom_output BOOLEAN, -- (16)!
);
```

1. `product_id` A unique name at the level of dq rules execution
2. `table_name` The table for which the rule is being defined for
3. `rule_type` 3 different type of rules. They are 'row_dq', 'agg_dq' and 'query_dq'
4. `rule` Short description of the rule
5. `column_name` The column name for which the rule is defined for. This only applies for row_dq. For agg_dq and query_dq, use blank/empty value.
6. `expectation` Provide the DQ rule condition
7. `action_if_failed` There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'.
Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types.
Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type.
Fail: DAG fails if the rule fails. Applies for all 3 rule types.
8. `tag` provide some tag name to dq rule example: completeness, validity, uniqueness etc.
9. `description` Long description for the rule
10. `enable_for_source_dq_validation` flag to run the agg rule
11. `enable_for_target_dq_validation` flag to run the query rule
12. `is_active` true or false to indicate if the rule is active or not.
13. `enable_error_drop_alert` true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data set
14. `error_drop_threshold` Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data set
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc
16. `enable_querydq_custom_output` required custom query output in separate table

rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions:
If rule_type is row_dq then row_dq is TRUE
If rule_type is agg_dq and enable_for_source_dq_validation is TRUE then source_agg_dq is TRUE
If rule_type is agg_dq and enable_for_target_dq_validation is TRUE then target_agg_dq is TRUE
If rule_type is query_dq and enable_for_source_dq_validation is TRUE then source_query_dq is TRUE
If rule_type is query_dq and enable_for_target_dq_validation is TRUE then target_query_dq is TRUE

### Rule Type For Rules

The rules column has a column called "rule_type". It is important that this column should only accept one of
Expand Down Expand Up @@ -72,58 +99,105 @@ if you want to create it manually, but it is not recommended.

```sql
create table if not exists `catalog`.`schema`.`dq_stats` (
product_id STRING,
table_name STRING,
input_count LONG,
error_count LONG,
output_count LONG,
output_percentage FLOAT,
success_percentage FLOAT,
error_percentage FLOAT,
source_agg_dq_results array<map<string, string>>,
final_agg_dq_results array<map<string, string>>,
source_query_dq_results array<map<string, string>>,
final_query_dq_results array<map<string, string>>,
row_dq_res_summary array<map<string, string>>,
row_dq_error_threshold array<map<string, string>>,
dq_status map<string, string>,
dq_run_time map<string, float>,
dq_rules map<string, map<string,int>>,
meta_dq_run_id STRING,
meta_dq_run_date DATE,
meta_dq_run_datetime TIMESTAMP
product_id STRING, -- (1)!
table_name STRING, -- (2)!
input_count LONG, -- (3)!
error_count LONG, -- (4)!
output_count LONG, -- (5)!
output_percentage FLOAT, -- (6)!
success_percentage FLOAT, -- (7)!
error_percentage FLOAT, -- (8)!
source_agg_dq_results array<map<string, string>>, -- (9)!
final_agg_dq_results array<map<string, string>>, -- (10)!
source_query_dq_results array<map<string, string>>, -- (11)!
final_query_dq_results array<map<string, string>>, -- (12)!
row_dq_res_summary array<map<string, string>>, -- (13)!
row_dq_error_threshold array<map<string, string>>, -- (14)!
dq_status map<string, string>, -- (15)!
dq_run_time map<string, float>, -- (16)!
dq_rules map<string, map<string,int>>, -- (17)!
meta_dq_run_id STRING, -- (18)!
meta_dq_run_date DATE, -- (19)!
meta_dq_run_datetime TIMESTAMP, -- (20)!
);
```

1. `product_id` A unique name at the level of dq rules execution
2. `table_name` The table for which the rule is being defined for
3. `input_count` total input row count of given dataframe
4. `error_count` total error count for all row_dq rules
5. `output_count` total count of records that passed the row_dq rules or configured to be ignored when they fail
6. `output_percentage` percentage of total count of records that passed the row_dq rules or configured to be ignored when they fail
7. `success_percentage` percentage of total count of records that passed the row_dq rules
8. `error_percentage` percentage of total count of records that failed the row_dq rules
9. `source_agg_dq_results` results for agg dq rules are stored
10. `final_agg_dq_results` results for agg dq rules are stored after row_dq rules executed
11. `source_query_dq_results` results for query dq rules are stored
12. `final_query_dq_results` results for query dq rules are stored after row_dq rules executed
13. `row_dq_res_summary` summary of row dq results are stored
14. `row_dq_error_threshold` threshold for rules defined in the rules table for row_dq rules
15. `dq_status` stores the status of the rule execution.
16. `dq_run_time` time taken by the rules
17. `dq_rules` how many dq rules are executed in this run
18. `meta_dq_run_id` unique id generated for this run
19. `meta_dq_run_date` date on which rule is executed
20. `meta_dq_run_datetime` date and time on which rule is executed

### DQ Detailed Stats Table

This table provides detailed stats of all the expectations along with the status provided in the stats table in a relational format.
This table need not be created. It gets auto created with "_detailed " to the dq stats table name.
This table need not be created. It gets auto created with "_detailed " to the dq stats table name. This is optional and only get's created if the config is set to have the detailed stats table.
Below is the schema


```sql
create table if not exists `catalog`.`schema`.`dq_stats_detailed` (
run_id string, -- (1)!
product_id string, -- (2)!
table_name string, -- (3)!
rule_type string, -- (4)!
rule string, -- (5)!
source_expectations string, -- (6)!
tag string, -- (7)!
description string, -- (8)!
source_dq_status string, -- (9)!
source_dq_actual_outcome string, -- (10)!
source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
target_expectations string, -- (15)!
target_dq_status string, -- (16)!
target_dq_actual_outcome string, -- (17)!
target_dq_expected_outcome string, -- (18)!
target_dq_actual_row_count string, -- (19)!
target_dq_error_row_count string, -- (20)!
target_dq_row_count string, -- (21)!
dq_date date, -- (22)!
dq_time string, -- (23)!
);
```
run_id string,
product_id string ,
table_name string ,
rule_type string ,
rule string ,
source_expectations string,
tag string ,
description string,
source_dq_status string ,
source_dq_actual_outcome string ,
source_dq_expected_outcome string ,
source_dq_actual_row_count string ,
source_dq_error_row_count string ,
source_dq_row_count string ,
target_expectations string ,
target_dq_status string ,
target_dq_actual_outcome string ,
target_dq_expected_outcome string ,
target_dq_actual_row_count string ,
target_dq_error_row_count string ,
target_dq_row_count string ,
dq_date date ,
dq_time string
```

1. `run_id` Run Id for a specific run
2. `product_id` Unique product identifier
3. `table_name` The target table where the final data gets inserted
4. `rule_type` Either row/query/agg dq
5. `rule` Rule name
6. `source_expectations` Actual Rule to be executed on the source dq
7. `tag` completeness,uniqueness,validity,accuracy,consistency,
8. `description` Description of the Rule
9. `source_dq_status` Status of the rule execution in the Source dq
10. `source_dq_actual_outcome` Actual outcome of the Source dq check
11. `source_dq_expected_outcome` Expected outcome of the Source dq check
12. `source_dq_actual_row_count` Number of rows of the source dq
13. `source_dq_error_row_count` Number of rows failed in the source dq
14. `source_dq_row_count` Number of rows of the source dq
15. `target_expectations` Actual Rule to be executed on the target dq
16. `target_dq_status` Status of the rule execution in the Target dq
17. `target_dq_actual_outcome` Actual outcome of the Target dq check
18. `target_dq_expected_outcome` Expected outcome of the Target dq check
19. `target_dq_actual_row_count` Number of rows of the target dq
20. `target_dq_error_row_count` Number of rows failed in the target dq
21. `target_dq_row_count` Number of rows of the target dq
22. `dq_date` Dq executed date
23. `dq_time` Dq executed timestamp
16 changes: 11 additions & 5 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,17 @@ class Constants:
dbx_secret_token = "se.streaming.dbx.secret.token"
dbx_topic_name = "se.streaming.dbx.topic.name"

# declare const user config variables for agg query dq detailed satats
enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"
# declare const user config variables for agg query dq detailed stats
se_enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
se_enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"

querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

# declare const variable for agg query dq detailed satats
se_agg_dq_expectation_regex_pattern = r"(\w+\(.+?\))([<>!=]+.+)$"
# declare const variable for agg query dq detailed stats
se_agg_dq_expectation_regex_pattern = (
r"(\(.+?\)|\w+\(.+?\))(\s*[<>!=]+\s*.+|\s*between\s*.+)$"
)
# declare const variable for range in agg query dq detailed stats
se_agg_dq_expectation_range_regex_pattern = (
r"(\w+\(\w+\)|\w+)(\s*[><]\s*\d+)\s+(and)\s+(\w+\(\w+\)|\w+)(\s*[><]\s*\d+)"
)
12 changes: 6 additions & 6 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def _except(func: Any) -> Any:
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: False,
user_config.se_notifications_on_error_drop_threshold: 100,
user_config.enable_agg_dq_detailed_result: False,
user_config.enable_query_dq_detailed_result: False,
user_config.se_enable_agg_dq_detailed_result: False,
user_config.se_enable_query_dq_detailed_result: False,
user_config.querydq_output_custom_table_name: f"{self.stats_table}_querydq_output",
}

Expand Down Expand Up @@ -165,18 +165,18 @@ def _except(func: Any) -> Any:
)

_agg_dq_detailed_stats: bool = (
bool(_notification_dict[user_config.enable_agg_dq_detailed_result])
bool(_notification_dict[user_config.se_enable_agg_dq_detailed_result])
if isinstance(
_notification_dict[user_config.enable_agg_dq_detailed_result],
_notification_dict[user_config.se_enable_agg_dq_detailed_result],
bool,
)
else False
)

_query_dq_detailed_stats: bool = (
bool(_notification_dict[user_config.enable_query_dq_detailed_result])
bool(_notification_dict[user_config.se_enable_query_dq_detailed_result])
if isinstance(
_notification_dict[user_config.enable_query_dq_detailed_result],
_notification_dict[user_config.se_enable_query_dq_detailed_result],
bool,
)
else False
Expand Down
2 changes: 2 additions & 0 deletions spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "ignore", "validity", "profit threshold should be greater tahn 0", false, true, true, true, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 1", "sales", "sum(sales)>99 and sum(sales)<99999", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales_range type 2", "sales", "sum(sales) between 100 and 10000 ", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(quantity)>10000", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(*) from ({source_f1}) a) - (select count(*) from ({target_f1}) b) ) < 3$source_f1$select distinct product_id,order_id from order_source$target_f1$select distinct product_id,order_id from order_target", "ignore", "validity", "row count threshold", true, true, true, false, 0,null, true)
Expand Down
2 changes: 2 additions & 0 deletions spark_expectations/examples/sample_dq_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
user_config.se_enable_error_table: True,
user_config.se_dq_rules_params: {
"env": "local",
Expand Down
4 changes: 2 additions & 2 deletions spark_expectations/examples/sample_dq_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.enable_query_dq_detailed_result: True,
user_config.enable_agg_dq_detailed_result: True,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
# user_config.querydq_output_custom_table_name: "dq_spark_local.dq_stats_detailed_outputt",
user_config.se_enable_error_table: True,
user_config.se_dq_rules_params: {
Expand Down
6 changes: 3 additions & 3 deletions spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
user_config.se_enable_error_table: True,
user_config.enable_query_dq_detailed_result: True,
user_config.enable_agg_dq_detailed_result: True,
user_config.se_dq_rules_params: {
"env": "local",
"table": "product",
Expand Down Expand Up @@ -107,7 +107,7 @@ def build_new() -> DataFrame:

spark.sql("use dq_spark_local")
spark.sql("select * from dq_spark_local.dq_stats").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_custom").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_detailed").show(truncate=False)
spark.sql("select * from dq_spark_local.dq_stats_querydq_output").show(
truncate=False
)
Expand Down
Loading

0 comments on commit 3a387e9

Please sign in to comment.