Skip to content

Commit

Permalink
Drop TempView is not supported latest Databricks clusters and Naming …
Browse files Browse the repository at this point in the history
…conventions (#85)

* naming corrections and removing dropTemp in reader.py

* correcting the documentation

---------

Co-authored-by: krishnam Jagadapi <[email protected]>
Co-authored-by: Ashok Singamaneni <[email protected]>
  • Loading branch information
3 people authored Apr 9, 2024
1 parent c83762c commit 19ed71a
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 101 deletions.
196 changes: 135 additions & 61 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,52 @@ your rules table for your project.

```sql
create table if not exists `catalog`.`schema`.`{product}_rules` (
product_id STRING,
table_name STRING,
rule_type STRING,
rule STRING,
column_name STRING,
expectation STRING,
action_if_failed STRING,
tag STRING,
description STRING,
enable_for_source_dq_validation BOOLEAN,
enable_for_target_dq_validation BOOLEAN,
is_active BOOLEAN,
enable_error_drop_alert BOOLEAN,
error_drop_threshold INT,
query_dq_delimiter STRING,
enable_querydq_custom_output BOOLEAN
product_id STRING, -- (1)!
table_name STRING, -- (2)!
rule_type STRING, -- (3)!
rule STRING, -- (4)!
column_name STRING, -- (5)!
expectation STRING, -- (6)!
action_if_failed STRING, -- (7)!
tag STRING, -- (8)!
description STRING, -- (9)!
enable_for_source_dq_validation BOOLEAN, -- (10)!
enable_for_target_dq_validation BOOLEAN, -- (11)!
is_active BOOLEAN, -- (12)!
enable_error_drop_alert BOOLEAN, -- (13)!
error_drop_threshold INT, -- (14)!
query_dq_delimiter STRING, -- (15)!
enable_querydq_custom_output BOOLEAN, -- (16)!
);
```

1. `product_id` A unique name at the level of dq rules execution
2. `table_name` The table for which the rule is being defined for
3. `rule_type` 3 different type of rules. They are 'row_dq', 'agg_dq' and 'query_dq'
4. `rule` Short description of the rule
5. `column_name` The column name for which the rule is defined for. This only applies for row_dq. For agg_dq and query_dq, use blank/empty value.
6. `expectation` Provide the DQ rule condition
7. `action_if_failed` There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'.
Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types.
Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type.
Fail: DAG fails if the rule fails. Applies for all 3 rule types.
8. `tag` provide some tag name to dq rule example: completeness, validity, uniqueness etc.
9. `description` Long description for the rule
10. `enable_for_source_dq_validation` flag to run the agg rule
11. `enable_for_target_dq_validation` flag to run the query rule
12. `is_active` true or false to indicate if the rule is active or not.
13. `enable_error_drop_alert` true or false. This determines if an alert notification should be sent out if row(s) is(are) dropped from the data set
14. `error_drop_threshold` Threshold for the alert notification that gets triggered when row(s) is(are) dropped from the data set
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc
16. `enable_querydq_custom_output` required custom query output in separate table

rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions:
If rule_type is row_dq then row_dq is TRUE
If rule_type is agg_dq and enable_for_source_dq_validation is TRUE then source_agg_dq is TRUE
If rule_type is agg_dq and enable_for_target_dq_validation is TRUE then target_agg_dq is TRUE
If rule_type is query_dq and enable_for_source_dq_validation is TRUE then source_query_dq is TRUE
If rule_type is query_dq and enable_for_target_dq_validation is TRUE then target_query_dq is TRUE

### Rule Type For Rules

The rules column has a column called "rule_type". It is important that this column should only accept one of
Expand Down Expand Up @@ -72,58 +99,105 @@ if you want to create it manually, but it is not recommended.

```sql
create table if not exists `catalog`.`schema`.`dq_stats` (
product_id STRING,
table_name STRING,
input_count LONG,
error_count LONG,
output_count LONG,
output_percentage FLOAT,
success_percentage FLOAT,
error_percentage FLOAT,
source_agg_dq_results array<map<string, string>>,
final_agg_dq_results array<map<string, string>>,
source_query_dq_results array<map<string, string>>,
final_query_dq_results array<map<string, string>>,
row_dq_res_summary array<map<string, string>>,
row_dq_error_threshold array<map<string, string>>,
dq_status map<string, string>,
dq_run_time map<string, float>,
dq_rules map<string, map<string,int>>,
meta_dq_run_id STRING,
meta_dq_run_date DATE,
meta_dq_run_datetime TIMESTAMP
product_id STRING, -- (1)!
table_name STRING, -- (2)!
input_count LONG, -- (3)!
error_count LONG, -- (4)!
output_count LONG, -- (5)!
output_percentage FLOAT, -- (6)!
success_percentage FLOAT, -- (7)!
error_percentage FLOAT, -- (8)!
source_agg_dq_results array<map<string, string>>, -- (9)!
final_agg_dq_results array<map<string, string>>, -- (10)!
source_query_dq_results array<map<string, string>>, -- (11)!
final_query_dq_results array<map<string, string>>, -- (12)!
row_dq_res_summary array<map<string, string>>, -- (13)!
row_dq_error_threshold array<map<string, string>>, -- (14)!
dq_status map<string, string>, -- (15)!
dq_run_time map<string, float>, -- (16)!
dq_rules map<string, map<string,int>>, -- (17)!
meta_dq_run_id STRING, -- (18)!
meta_dq_run_date DATE, -- (19)!
meta_dq_run_datetime TIMESTAMP, -- (20)!
);
```

1. `product_id` A unique name at the level of dq rules execution
2. `table_name` The table for which the rule is being defined for
3. `input_count` total input row count of given dataframe
4. `error_count` total error count for all row_dq rules
5. `output_count` total count of records that passed the row_dq rules or configured to be ignored when they fail
6. `output_percentage` percentage of total count of records that passed the row_dq rules or configured to be ignored when they fail
7. `success_percentage` percentage of total count of records that passed the row_dq rules
8. `error_percentage` percentage of total count of records that failed the row_dq rules
9. `source_agg_dq_results` results for agg dq rules are stored
10. `final_agg_dq_results` results for agg dq rules are stored after row_dq rules executed
11. `source_query_dq_results` results for query dq rules are stored
12. `final_query_dq_results` results for query dq rules are stored after row_dq rules executed
13. `row_dq_res_summary` summary of row dq results are stored
14. `row_dq_error_threshold` threshold for rules defined in the rules table for row_dq rules
15. `dq_status` stores the status of the rule execution.
16. `dq_run_time` time taken by the rules
17. `dq_rules` how many dq rules are executed in this run
18. `meta_dq_run_id` unique id generated for this run
19. `meta_dq_run_date` date on which rule is executed
20. `meta_dq_run_datetime` date and time on which rule is executed

### DQ Detailed Stats Table

This table provides detailed stats of all the expectations along with the status provided in the stats table in a relational format.
This table need not be created. It gets auto created with "_detailed " to the dq stats table name.
This table need not be created. It gets auto created with "_detailed " to the dq stats table name. This is optional and only get's created if the config is set to have the detailed stats table.
Below is the schema


```sql
create table if not exists `catalog`.`schema`.`dq_stats_detailed` (
run_id string, -- (1)!
product_id string, -- (2)!
table_name string, -- (3)!
rule_type string, -- (4)!
rule string, -- (5)!
source_expectations string, -- (6)!
tag string, -- (7)!
description string, -- (8)!
source_dq_status string, -- (9)!
source_dq_actual_outcome string, -- (10)!
source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
target_expectations string, -- (15)!
target_dq_status string, -- (16)!
target_dq_actual_outcome string, -- (17)!
target_dq_expected_outcome string, -- (18)!
target_dq_actual_row_count string, -- (19)!
target_dq_error_row_count string, -- (20)!
target_dq_row_count string, -- (21)!
dq_date date, -- (22)!
dq_time string, -- (23)!
);
```
run_id string,
product_id string ,
table_name string ,
rule_type string ,
rule string ,
source_expectations string,
tag string ,
description string,
source_dq_status string ,
source_dq_actual_outcome string ,
source_dq_expected_outcome string ,
source_dq_actual_row_count string ,
source_dq_error_row_count string ,
source_dq_row_count string ,
target_expectations string ,
target_dq_status string ,
target_dq_actual_outcome string ,
target_dq_expected_outcome string ,
target_dq_actual_row_count string ,
target_dq_error_row_count string ,
target_dq_row_count string ,
dq_date date ,
dq_time string
```

1. `run_id` Run Id for a specific run
2. `product_id` Unique product identifier
3. `table_name` The target table where the final data gets inserted
4. `rule_type` Either row/query/agg dq
5. `rule` Rule name
6. `source_expectations` Actual Rule to be executed on the source dq
7. `tag` completeness,uniqueness,validity,accuracy,consistency,
8. `description` Description of the Rule
9. `source_dq_status` Status of the rule execution in the Source dq
10. `source_dq_actual_outcome` Actual outcome of the Source dq check
11. `source_dq_expected_outcome` Expected outcome of the Source dq check
12. `source_dq_actual_row_count` Number of rows of the source dq
13. `source_dq_error_row_count` Number of rows failed in the source dq
14. `source_dq_row_count` Number of rows of the source dq
15. `target_expectations` Actual Rule to be executed on the target dq
16. `target_dq_status` Status of the rule execution in the Target dq
17. `target_dq_actual_outcome` Actual outcome of the Target dq check
18. `target_dq_expected_outcome` Expected outcome of the Target dq check
19. `target_dq_actual_row_count` Number of rows of the target dq
20. `target_dq_error_row_count` Number of rows failed in the target dq
21. `target_dq_row_count` Number of rows of the target dq
22. `dq_date` Dq executed date
23. `dq_time` Dq executed timestamp
8 changes: 4 additions & 4 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ class Constants:
dbx_secret_token = "se.streaming.dbx.secret.token"
dbx_topic_name = "se.streaming.dbx.topic.name"

# declare const user config variables for agg query dq detailed satats
enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"
# declare const user config variables for agg query dq detailed stats
se_enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
se_enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"

querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

# declare const variable for agg query dq detailed satats
# declare const variable for agg query dq detailed stats
se_agg_dq_expectation_regex_pattern = r"(\w+\(.+?\))([<>!=]+.+)$"
12 changes: 6 additions & 6 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ def _except(func: Any) -> Any:
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: False,
user_config.se_notifications_on_error_drop_threshold: 100,
user_config.enable_agg_dq_detailed_result: False,
user_config.enable_query_dq_detailed_result: False,
user_config.se_enable_agg_dq_detailed_result: False,
user_config.se_enable_query_dq_detailed_result: False,
user_config.querydq_output_custom_table_name: f"{self.stats_table}_querydq_output",
}

Expand Down Expand Up @@ -165,18 +165,18 @@ def _except(func: Any) -> Any:
)

_agg_dq_detailed_stats: bool = (
bool(_notification_dict[user_config.enable_agg_dq_detailed_result])
bool(_notification_dict[user_config.se_enable_agg_dq_detailed_result])
if isinstance(
_notification_dict[user_config.enable_agg_dq_detailed_result],
_notification_dict[user_config.se_enable_agg_dq_detailed_result],
bool,
)
else False
)

_query_dq_detailed_stats: bool = (
bool(_notification_dict[user_config.enable_query_dq_detailed_result])
bool(_notification_dict[user_config.se_enable_query_dq_detailed_result])
if isinstance(
_notification_dict[user_config.enable_query_dq_detailed_result],
_notification_dict[user_config.se_enable_query_dq_detailed_result],
bool,
)
else False
Expand Down
2 changes: 2 additions & 0 deletions spark_expectations/examples/sample_dq_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
user_config.se_enable_error_table: True,
user_config.se_dq_rules_params: {
"env": "local",
Expand Down
4 changes: 2 additions & 2 deletions spark_expectations/examples/sample_dq_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.enable_query_dq_detailed_result: True,
user_config.enable_agg_dq_detailed_result: True,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
# user_config.querydq_output_custom_table_name: "dq_spark_local.dq_stats_detailed_outputt",
user_config.se_enable_error_table: True,
user_config.se_dq_rules_params: {
Expand Down
4 changes: 2 additions & 2 deletions spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
user_config.se_notifications_on_fail: True,
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
user_config.se_notifications_on_error_drop_threshold: 15,
user_config.se_enable_query_dq_detailed_result: True,
user_config.se_enable_agg_dq_detailed_result: True,
user_config.se_enable_error_table: True,
user_config.enable_query_dq_detailed_result: True,
user_config.enable_agg_dq_detailed_result: True,
user_config.se_dq_rules_params: {
"env": "local",
"table": "product",
Expand Down
8 changes: 4 additions & 4 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def _create_dataframe(
"""
return self.spark.createDataFrame(data, schema=schema)

def _prep_secondary_query_output(self) -> List[Tuple]:
def _prep_secondary_query_output(self) -> DataFrame:
"""
Prepares the secondary query output by performing various transformations and joins.
Expand Down Expand Up @@ -341,7 +341,7 @@ def _prep_detailed_stats(
_source_querydq_detailed_stats_result: Optional[List[Tuple]],
_target_aggdq_detailed_stats_result: Optional[List[Tuple]],
_target_querydq_detailed_stats_result: Optional[List[Tuple]],
) -> Optional[List[Tuple]]:
) -> DataFrame:
"""
Prepares detailed statistics for the data quality checks.
Expand Down Expand Up @@ -524,7 +524,7 @@ def write_detailed_stats(self) -> None:

_log.info(
"Writing metrics to the querydq custom output table: %s, ended",
{self._context.get_query_dq_output_custom_table_name},
self._context.get_query_dq_output_custom_table_name,
)

_df_custom_detailed_stats_source = self._prep_secondary_query_output()
Expand Down Expand Up @@ -737,7 +737,7 @@ def write_error_stats(self) -> None:

_log.info(
"Writing metrics to the stats table: %s, ended",
{self._context.get_dq_stats_table_name},
self._context.get_dq_stats_table_name,
)

# TODO check if streaming_stats is set to off, if it's enabled only then this should run
Expand Down
Loading

0 comments on commit 19ed71a

Please sign in to comment.